ARTEMIS-813 Ensure no duplicate journal records on address update
This commit is contained in:
parent
6ab133ab89
commit
af27714026
|
@ -58,12 +58,20 @@ public interface AddressManager {
|
||||||
|
|
||||||
Set<SimpleString> getAddresses();
|
Set<SimpleString> getAddresses();
|
||||||
|
|
||||||
AddressInfo addAddressInfo(AddressInfo addressInfo);
|
/**
|
||||||
|
* @param addressInfo
|
||||||
|
* @return true if the address was added, false if it wasn't added
|
||||||
|
*/
|
||||||
|
boolean addAddressInfo(AddressInfo addressInfo);
|
||||||
|
|
||||||
AddressInfo updateAddressInfoIfPresent(SimpleString addressName,
|
AddressInfo updateAddressInfoIfPresent(SimpleString addressName,
|
||||||
BiFunction<? super SimpleString, ? super AddressInfo, ? extends AddressInfo> remappingFunction);
|
BiFunction<? super SimpleString, ? super AddressInfo, ? extends AddressInfo> remappingFunction);
|
||||||
|
|
||||||
AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo);
|
/**
|
||||||
|
* @param addressInfo
|
||||||
|
* @return true if the address was added, false if it was updated
|
||||||
|
*/
|
||||||
|
boolean addOrUpdateAddressInfo(AddressInfo addressInfo);
|
||||||
|
|
||||||
AddressInfo removeAddressInfo(SimpleString address);
|
AddressInfo removeAddressInfo(SimpleString address);
|
||||||
|
|
||||||
|
|
|
@ -45,9 +45,17 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
*/
|
*/
|
||||||
public interface PostOffice extends ActiveMQComponent {
|
public interface PostOffice extends ActiveMQComponent {
|
||||||
|
|
||||||
AddressInfo addAddressInfo(AddressInfo addressInfo);
|
/**
|
||||||
|
* @param addressInfo
|
||||||
|
* @return true if the address was added, false if it wasn't added
|
||||||
|
*/
|
||||||
|
boolean addAddressInfo(AddressInfo addressInfo);
|
||||||
|
|
||||||
AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo);
|
/**
|
||||||
|
* @param addressInfo
|
||||||
|
* @return true if the address was added, false if it was updated
|
||||||
|
*/
|
||||||
|
boolean addOrUpdateAddressInfo(AddressInfo addressInfo);
|
||||||
|
|
||||||
AddressInfo removeAddressInfo(SimpleString address) throws Exception;
|
AddressInfo removeAddressInfo(SimpleString address) throws Exception;
|
||||||
|
|
||||||
|
|
|
@ -424,26 +424,34 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
// PostOffice implementation -----------------------------------------------
|
// PostOffice implementation -----------------------------------------------
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AddressInfo addAddressInfo(AddressInfo addressInfo) {
|
public boolean addAddressInfo(AddressInfo addressInfo) {
|
||||||
synchronized (addressLock) {
|
synchronized (addressLock) {
|
||||||
|
boolean result = addressManager.addAddressInfo(addressInfo);
|
||||||
|
// only register address if it is new
|
||||||
|
if (result) {
|
||||||
try {
|
try {
|
||||||
managementService.registerAddress(addressInfo);
|
managementService.registerAddress(addressInfo);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
return addressManager.addAddressInfo(addressInfo);
|
}
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
|
public boolean addOrUpdateAddressInfo(AddressInfo addressInfo) {
|
||||||
synchronized (addressLock) {
|
synchronized (addressLock) {
|
||||||
|
boolean result = addressManager.addOrUpdateAddressInfo(addressInfo);
|
||||||
|
// only register address if it is newly added
|
||||||
|
if (result) {
|
||||||
try {
|
try {
|
||||||
managementService.registerAddress(addressInfo);
|
managementService.registerAddress(addressInfo);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
return addressManager.addOrUpdateAddressInfo(addressInfo);
|
}
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||||
import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
|
import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
|
||||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
|
@ -216,8 +215,8 @@ public class SimpleAddressManager implements AddressManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AddressInfo addAddressInfo(AddressInfo addressInfo) {
|
public boolean addAddressInfo(AddressInfo addressInfo) {
|
||||||
return addressInfoMap.putIfAbsent(addressInfo.getName(), addressInfo);
|
return addressInfoMap.putIfAbsent(addressInfo.getName(), addressInfo) == null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -226,23 +225,20 @@ public class SimpleAddressManager implements AddressManager {
|
||||||
return addressInfoMap.computeIfPresent(addressName, remappingFunction);
|
return addressInfoMap.computeIfPresent(addressName, remappingFunction);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
public boolean addOrUpdateAddressInfo(AddressInfo addressInfo) {
|
||||||
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
|
boolean isNew = addAddressInfo(addressInfo);
|
||||||
AddressInfo from = addAddressInfo(addressInfo);
|
|
||||||
if (from != null) {
|
// address already exists so update it
|
||||||
ActiveMQServerLogger.LOGGER.info("Address " + addressInfo.getName() + " exists already as " + from + ", updating instead with: " + addressInfo);
|
if (!isNew) {
|
||||||
|
AddressInfo toUpdate = getAddressInfo(addressInfo.getName());
|
||||||
|
synchronized (toUpdate) {
|
||||||
|
for (RoutingType routingType : addressInfo.getRoutingTypes()) {
|
||||||
|
toUpdate.addRoutingType(routingType);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return (from == null) ? addressInfo : updateAddressInfo(from, addressInfo);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private AddressInfo updateAddressInfo(AddressInfo from, AddressInfo to) {
|
return isNew;
|
||||||
synchronized (from) {
|
|
||||||
for (RoutingType routingType : to.getRoutingTypes()) {
|
|
||||||
from.addRoutingType(routingType);
|
|
||||||
}
|
|
||||||
ActiveMQServerLogger.LOGGER.info("Update result: " + from);
|
|
||||||
return from;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -474,11 +474,9 @@ public interface ActiveMQServer extends ActiveMQComponent {
|
||||||
*/
|
*/
|
||||||
void removeRoutingType(String address, RoutingType routingType) throws Exception;
|
void removeRoutingType(String address, RoutingType routingType) throws Exception;
|
||||||
|
|
||||||
AddressInfo putAddressInfoIfAbsent(AddressInfo addressInfo) throws Exception;
|
boolean createAddressInfo(AddressInfo addressInfo) throws Exception;
|
||||||
|
|
||||||
void createAddressInfo(AddressInfo addressInfo) throws Exception;
|
boolean createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception;
|
||||||
|
|
||||||
AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception;
|
|
||||||
|
|
||||||
void removeAddressInfo(SimpleString address, SecurityAuth session) throws Exception;
|
void removeAddressInfo(SimpleString address, SecurityAuth session) throws Exception;
|
||||||
|
|
||||||
|
|
|
@ -2198,16 +2198,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
// Deploy any predefined queues
|
// Deploy any predefined queues
|
||||||
deployQueuesFromConfiguration();
|
deployQueuesFromConfiguration();
|
||||||
|
|
||||||
// registerPostQueueDeletionCallback(new PostQueueDeletionCallback() {
|
|
||||||
// // TODO delete auto-created addresses when queueCount == 0
|
|
||||||
// @Override
|
|
||||||
// public void callback(SimpleString address, SimpleString queueName) throws Exception {
|
|
||||||
// if (getAddressInfo(address).isAutoCreated()) {
|
|
||||||
// removeAddressInfo(address);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// });
|
|
||||||
|
|
||||||
// We need to call this here, this gives any dependent server a chance to deploy its own addresses
|
// We need to call this here, this gives any dependent server a chance to deploy its own addresses
|
||||||
// this needs to be done before clustering is fully activated
|
// this needs to be done before clustering is fully activated
|
||||||
callActivateCallbacks();
|
callActivateCallbacks();
|
||||||
|
@ -2408,34 +2398,34 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
postOffice.removeRoutingType(addressName,routingType);
|
postOffice.removeRoutingType(addressName,routingType);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
public boolean createAddressInfo(AddressInfo addressInfo) throws Exception {
|
||||||
public AddressInfo putAddressInfoIfAbsent(AddressInfo addressInfo) throws Exception {
|
boolean result = postOffice.addAddressInfo(addressInfo);
|
||||||
AddressInfo result = postOffice.addAddressInfo(addressInfo);
|
|
||||||
|
|
||||||
// TODO: is this the right way to do this?
|
if (result) {
|
||||||
long txID = storageManager.generateID();
|
long txID = storageManager.generateID();
|
||||||
storageManager.addAddressBinding(txID, addressInfo);
|
storageManager.addAddressBinding(txID, addressInfo);
|
||||||
storageManager.commitBindings(txID);
|
storageManager.commitBindings(txID);
|
||||||
|
} else {
|
||||||
|
throw ActiveMQMessageBundle.BUNDLE.addressAlreadyExists(addressInfo.getName());
|
||||||
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void createAddressInfo(AddressInfo addressInfo) throws Exception {
|
public boolean createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception {
|
||||||
if (putAddressInfoIfAbsent(addressInfo) != null) {
|
boolean result = postOffice.addOrUpdateAddressInfo(addressInfo);
|
||||||
throw ActiveMQMessageBundle.BUNDLE.addressAlreadyExists(addressInfo.getName());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception {
|
|
||||||
AddressInfo result = postOffice.addOrUpdateAddressInfo(addressInfo);
|
|
||||||
|
|
||||||
// TODO: is this the right way to do this?
|
|
||||||
// TODO: deal with possible duplicates, may be adding new records when old ones already exist
|
|
||||||
long txID = storageManager.generateID();
|
long txID = storageManager.generateID();
|
||||||
|
if (result) {
|
||||||
storageManager.addAddressBinding(txID, addressInfo);
|
storageManager.addAddressBinding(txID, addressInfo);
|
||||||
storageManager.commitBindings(txID);
|
storageManager.commitBindings(txID);
|
||||||
|
} else {
|
||||||
|
AddressInfo updatedAddressInfo = getAddressInfo(addressInfo.getName());
|
||||||
|
storageManager.deleteAddressBinding(txID, updatedAddressInfo.getId());
|
||||||
|
storageManager.addAddressBinding(txID, updatedAddressInfo);
|
||||||
|
storageManager.commitBindings(txID);
|
||||||
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -2452,7 +2442,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address);
|
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: is this the right way to do this? Should it use a transaction?
|
|
||||||
long txID = storageManager.generateID();
|
long txID = storageManager.generateID();
|
||||||
storageManager.deleteAddressBinding(txID, addressInfo.getId());
|
storageManager.deleteAddressBinding(txID, addressInfo.getId());
|
||||||
storageManager.commitBindings(txID);
|
storageManager.commitBindings(txID);
|
||||||
|
|
|
@ -166,7 +166,6 @@ public class PostOfficeJournalLoader implements JournalLoader {
|
||||||
|
|
||||||
queues.put(queue.getID(), queue);
|
queues.put(queue.getID(), queue);
|
||||||
postOffice.addBinding(binding);
|
postOffice.addBinding(binding);
|
||||||
//managementService.registerAddress(queue.getAddress());
|
|
||||||
managementService.registerQueue(queue, queue.getAddress(), storageManager);
|
managementService.registerQueue(queue, queue.getAddress(), storageManager);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -178,11 +177,8 @@ public class PostOfficeJournalLoader implements JournalLoader {
|
||||||
for (AddressBindingInfo addressBindingInfo : addressBindingInfos) {
|
for (AddressBindingInfo addressBindingInfo : addressBindingInfos) {
|
||||||
addressBindingInfosMap.put(addressBindingInfo.getId(), addressBindingInfo);
|
addressBindingInfosMap.put(addressBindingInfo.getId(), addressBindingInfo);
|
||||||
|
|
||||||
// TODO: figure out what else to set here
|
AddressInfo addressInfo = new AddressInfo(addressBindingInfo.getName()).setRoutingTypes(addressBindingInfo.getRoutingTypes());
|
||||||
AddressInfo addressInfo = new AddressInfo(addressBindingInfo.getName())
|
|
||||||
.setRoutingTypes(addressBindingInfo.getRoutingTypes());
|
|
||||||
postOffice.addAddressInfo(addressInfo);
|
postOffice.addAddressInfo(addressInfo);
|
||||||
managementService.registerAddress(addressInfo);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -585,7 +585,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
final boolean autoCreated) throws Exception {
|
final boolean autoCreated) throws Exception {
|
||||||
Pair<SimpleString, Set<RoutingType>> art = getAddressAndRoutingTypes(address, routingTypes);
|
Pair<SimpleString, Set<RoutingType>> art = getAddressAndRoutingTypes(address, routingTypes);
|
||||||
securityCheck(art.getA(), CheckType.CREATE_ADDRESS, this);
|
securityCheck(art.getA(), CheckType.CREATE_ADDRESS, this);
|
||||||
return server.createOrUpdateAddressInfo(new AddressInfo(art.getA(), art.getB()).setAutoCreated(autoCreated));
|
server.createOrUpdateAddressInfo(new AddressInfo(art.getA(), art.getB()).setAutoCreated(autoCreated));
|
||||||
|
return server.getAddressInfo(art.getA());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -594,7 +595,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
final boolean autoCreated) throws Exception {
|
final boolean autoCreated) throws Exception {
|
||||||
Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(address, routingType);
|
Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(address, routingType);
|
||||||
securityCheck(art.getA(), CheckType.CREATE_ADDRESS, this);
|
securityCheck(art.getA(), CheckType.CREATE_ADDRESS, this);
|
||||||
return server.createOrUpdateAddressInfo(new AddressInfo(art.getA(), art.getB()).setAutoCreated(autoCreated));
|
server.createOrUpdateAddressInfo(new AddressInfo(art.getA(), art.getB()).setAutoCreated(autoCreated));
|
||||||
|
return server.getAddressInfo(art.getA());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -85,13 +85,13 @@ public class FakePostOffice implements PostOffice {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AddressInfo addAddressInfo(AddressInfo addressInfo) {
|
public boolean addAddressInfo(AddressInfo addressInfo) {
|
||||||
return null;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
|
public boolean addOrUpdateAddressInfo(AddressInfo addressInfo) {
|
||||||
return null;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue