This commit is contained in:
Clebert Suconic 2017-07-24 10:50:52 -04:00
commit 3bc0655ef7
7 changed files with 1249 additions and 6 deletions

View File

@ -84,6 +84,7 @@
<jetty.version>9.4.3.v20170317</jetty.version>
<jgroups.version>3.6.13.Final</jgroups.version>
<maven.assembly.plugin.version>2.4</maven.assembly.plugin.version>
<mockito.version>2.8.47</mockito.version>
<netty.version>4.1.9.Final</netty.version>
<proton.version>0.19.0</proton.version>
<resteasy.version>3.0.19.Final</resteasy.version>
@ -275,6 +276,14 @@
<version>${commons.collections.version}</version>
<!-- License: Apache 2.0 -->
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
<!-- License: MIT -->
</dependency>
<!-- ## End Test Dependencies ## -->
<!-- ### Build Time Dependencies ### -->

View File

@ -62,6 +62,13 @@
<type>test-jar</type>
</dependency>
<!-- I imported this to get a mock of a class -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>

View File

@ -46,6 +46,7 @@ import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.RegionBrokerProxy;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
@ -250,14 +251,14 @@ public class BrokerService implements Service {
}
//below are methods called directly by tests
//we don't actually implement any of these for now,
//we don't actually implement many of these for now,
//just to make test compile pass.
//we may get class cast exception as in TestSupport it
//casts the broker to RegionBroker, which we didn't
//implement (wrap) yet. Consider solving it later.
public Broker getRegionBroker() {
return broker;
try {
return RegionBrokerProxy.newRegionBroker((ArtemisBrokerWrapper) getBroker());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {

View File

@ -90,6 +90,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
}
SimpleString dla = new SimpleString("ActiveMQ.DLQ");
commonSettings.setDeadLetterAddress(dla);
commonSettings.setExpiryAddress(dla);
commonSettings.setAutoCreateQueues(true);
commonSettings.setAutoCreateAddresses(true);

View File

@ -0,0 +1,157 @@
package org.apache.activemq.broker.artemiswrapper;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.QueueRegion;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.policy.DestinationProxy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class RegionProxy implements Region {
private final ActiveMQServer server;
private final RoutingType routingType;
private RegionProxy(ActiveMQServer activeMQServer, RoutingType routingType) {
this.server = activeMQServer;
this.routingType = routingType;
}
public static Region newQueueRegion(ActiveMQServer activeMQServer) {
return Mockito.mock(QueueRegion.class, AdditionalAnswers.delegatesTo(new RegionProxy(activeMQServer, RoutingType.ANYCAST)));
}
public static Region newTopicRegion(ActiveMQServer activeMQServer) {
return Mockito.mock(TopicRegion.class, AdditionalAnswers.delegatesTo(new RegionProxy(activeMQServer, RoutingType.MULTICAST)));
}
@Override
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public Map<ActiveMQDestination, Destination> getDestinationMap() {
return server.getPostOffice().getAllBindings().entrySet().stream()
.filter(e -> e.getValue() instanceof QueueBinding)
.filter(e -> {
final SimpleString address = ((QueueBinding) e.getValue()).getQueue().getAddress();
return server.getAddressInfo(address).getRoutingType() == routingType;
}
)
.collect(Collectors.toMap(
e -> {
final String uniqueName = e.getValue().getUniqueName().toString();
return new ActiveMQQueue(uniqueName);
},
e -> {
final Queue queue = ((QueueBinding) e.getValue()).getQueue();
final String address = e.getValue().getAddress().toString();
return new DestinationProxy(queue, address, server);
}));
}
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void gc() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public Set<Destination> getDestinations(ActiveMQDestination destination) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void reapplyInterceptor() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void start() throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void stop() throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
}

View File

@ -0,0 +1,645 @@
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageCapacity;
import org.apache.activemq.usage.UsageListener;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
public class DestinationProxy implements Destination {
private final String name;
private final Queue view;
private final ActiveMQServer server;
public DestinationProxy(Queue view, String name, ActiveMQServer server) {
this.view = view;
this.name = name;
this.server = server;
}
// Destination
@Override
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public long getInactiveTimeoutBeforeGC() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void markForGC(long timeStamp) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public boolean canGC() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void gc() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public ActiveMQDestination getActiveMQDestination() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public MemoryUsage getMemoryUsage() {
return new MemoryUsage() {
@Override
public void waitForSpace() throws InterruptedException {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public boolean waitForSpace(long timeout) throws InterruptedException {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public boolean isFull() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void enqueueUsage(long value) throws InterruptedException {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void increaseUsage(long value) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void decreaseUsage(long value) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
protected long retrieveUsage() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public long getUsage() {
try {
return server.getPagingManager().getPageStore(view.getAddress()).getAddressSize();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void setUsage(long usage) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setPercentOfJvmHeap(int percentOfJvmHeap) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public boolean waitForSpace(long timeout, int highWaterMark) throws InterruptedException {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public boolean isFull(int highWaterMark) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void addUsageListener(UsageListener listener) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void removeUsageListener(UsageListener listener) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public int getNumUsageListeners() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public long getLimit() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setLimit(long limit) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
protected void onLimitChange() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public float getUsagePortion() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setUsagePortion(float usagePortion) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public int getPercentUsage() {
long total = 0;
try {
total = server.getPagingManager().getPageStore(view.getAddress()).getMaxSize();
} catch (Exception e) {
throw new RuntimeException(e);
}
return (int) ((float) getUsage() / total * 100.0);
}
@Override
protected void setPercentUsage(int value) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public int getPercentUsageMinDelta() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setPercentUsageMinDelta(int percentUsageMinDelta) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
protected int caclPercentUsage() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public String getName() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setName(String name) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public String toString() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void start() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void stop() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
protected void addChild(MemoryUsage child) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
protected void removeChild(MemoryUsage child) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public boolean notifyCallbackWhenNotFull(Runnable callback) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public UsageCapacity getLimiter() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setLimiter(UsageCapacity limiter) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public int getPollingTime() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setPollingTime(int pollingTime) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public MemoryUsage getParent() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setParent(MemoryUsage parent) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public ThreadPoolExecutor getExecutor() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setExecutor(ThreadPoolExecutor executor) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public boolean isStarted() {
throw new UnsupportedOperationException("Not implemented yet");
}
};
}
@Override
public void setMemoryUsage(MemoryUsage memoryUsage) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void dispose(ConnectionContext context) throws IOException {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public boolean isDisposed() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public DestinationStatistics getDestinationStatistics() {
return new DestinationStatistics() {
private CountStatisticImpl newFakeCountStatistic(Answer<?> getCountFunction) {
CountStatisticImpl mock = Mockito.mock(CountStatisticImpl.class);
Mockito.doAnswer(getCountFunction).when(mock).getCount();
return mock;
}
@Override
public CountStatisticImpl getEnqueues() {
return newFakeCountStatistic(invocation -> view.getMessagesAdded());
}
@Override
public CountStatisticImpl getDequeues() {
return newFakeCountStatistic(invocation -> view.getMessagesAcknowledged());
}
@Override
public CountStatisticImpl getDispatched() {
return newFakeCountStatistic(invocation -> getDequeues().getCount() + getInflight().getCount());
}
@Override
public CountStatisticImpl getExpired() {
return newFakeCountStatistic(invocation -> view.getMessagesExpired());
}
@Override
public CountStatisticImpl getMessages() {
return newFakeCountStatistic(invocation -> view.getMessageCount());
}
@Override
public CountStatisticImpl getInflight() {
return newFakeCountStatistic(invocation -> (long) view.getDeliveringCount());
}
};
}
@Override
public DeadLetterStrategy getDeadLetterStrategy() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public Message[] browse() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public String getName() {
return name;
}
@Override
public MessageStore getMessageStore() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public boolean isProducerFlowControl() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setProducerFlowControl(boolean value) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public boolean isAlwaysRetroactive() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setAlwaysRetroactive(boolean value) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public long getBlockedProducerWarningInterval() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public int getMaxProducersToAudit() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setMaxProducersToAudit(int maxProducersToAudit) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public int getMaxAuditDepth() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setMaxAuditDepth(int maxAuditDepth) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public boolean isEnableAudit() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setEnableAudit(boolean enableAudit) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public boolean isActive() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public int getMaxPageSize() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setMaxPageSize(int maxPageSize) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public int getMaxBrowsePageSize() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setMaxBrowsePageSize(int maxPageSize) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public boolean isUseCache() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setUseCache(boolean useCache) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public int getMinimumMessageSize() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setMinimumMessageSize(int minimumMessageSize) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public int getCursorMemoryHighWaterMark() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void wakeup() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public boolean isLazyDispatch() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setLazyDispatch(boolean value) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void messageExpired(ConnectionContext context, Subscription subs, MessageReference node) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void slowConsumer(ConnectionContext context, Subscription subs) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void isFull(ConnectionContext context, Usage<?> usage) {
}
@Override
public List<Subscription> getConsumers() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public boolean isPrioritizedMessages() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public SlowConsumerStrategy getSlowConsumerStrategy() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public boolean isDoOptimzeMessageStorage() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void clearPendingMessages() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void duplicateFromStore(Message message, Subscription subscription) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void start() throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void stop() throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public boolean iterate() {
throw new UnsupportedOperationException("Not implemented yet");
}
}

View File

@ -0,0 +1,423 @@
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.artemiswrapper.ArtemisBrokerWrapper;
import org.apache.activemq.broker.artemiswrapper.RegionProxy;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage;
import org.mockito.Mockito;
import javax.management.MBeanServer;
import java.net.URI;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import static org.mockito.AdditionalAnswers.delegatesTo;
public class RegionBrokerProxy implements Broker {
private final ActiveMQServer server;
private final MBeanServer mBeanServer;
private RegionBrokerProxy(ArtemisBrokerWrapper wrapper) {
this.server = wrapper.getServer();
this.mBeanServer = wrapper.getMbeanServer();
}
public static RegionBroker newRegionBroker(ArtemisBrokerWrapper broker) {
Broker brokerProxy = null;
try {
brokerProxy = new RegionBrokerProxy(broker);
RegionBroker regionBroker = Mockito.mock(RegionBroker.class, delegatesTo(brokerProxy));
return regionBroker;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// RegionBroker methods called by enabled tests
public Region getTopicRegion() {
return RegionProxy.newTopicRegion(server);
}
public Region getQueueRegion() {
return RegionProxy.newQueueRegion(server);
}
//everything else, to satisfy the Broker interface
//we don't actually implement (wrap) many of these for now,
//just to make test compile pass.
@Override
public Broker getAdaptor(Class aClass) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public BrokerId getBrokerId() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public String getBrokerName() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void addBroker(Connection connection, BrokerInfo brokerInfo) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void removeBroker(Connection connection, BrokerInfo brokerInfo) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void addConnection(ConnectionContext connectionContext, ConnectionInfo connectionInfo) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void removeConnection(ConnectionContext connectionContext, ConnectionInfo connectionInfo, Throwable throwable) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void addSession(ConnectionContext connectionContext, SessionInfo sessionInfo) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void removeSession(ConnectionContext connectionContext, SessionInfo sessionInfo) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public Destination addDestination(ConnectionContext connectionContext, ActiveMQDestination activeMQDestination, boolean b) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void removeDestination(ConnectionContext connectionContext, ActiveMQDestination activeMQDestination, long l) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public Map<ActiveMQDestination, Destination> getDestinationMap() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public Subscription addConsumer(ConnectionContext connectionContext, ConsumerInfo consumerInfo) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void removeConsumer(ConnectionContext connectionContext, ConsumerInfo consumerInfo) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void addProducer(ConnectionContext connectionContext, ProducerInfo producerInfo) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void removeProducer(ConnectionContext connectionContext, ProducerInfo producerInfo) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void removeSubscription(ConnectionContext connectionContext, RemoveSubscriptionInfo removeSubscriptionInfo) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void send(ProducerBrokerExchange producerBrokerExchange, Message message) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void acknowledge(ConsumerBrokerExchange consumerBrokerExchange, MessageAck messageAck) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public Response messagePull(ConnectionContext connectionContext, MessagePull messagePull) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void gc() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public Set<Destination> getDestinations(ActiveMQDestination activeMQDestination) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void processConsumerControl(ConsumerBrokerExchange consumerBrokerExchange, ConsumerControl consumerControl) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void reapplyInterceptor() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public Connection[] getClients() throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public ActiveMQDestination[] getDestinations() throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public Map<ActiveMQDestination, Destination> getDestinationMap(ActiveMQDestination activeMQDestination) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public TransactionId[] getPreparedTransactions(ConnectionContext connectionContext) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void beginTransaction(ConnectionContext connectionContext, TransactionId transactionId) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public int prepareTransaction(ConnectionContext connectionContext, TransactionId transactionId) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void rollbackTransaction(ConnectionContext connectionContext, TransactionId transactionId) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void commitTransaction(ConnectionContext connectionContext, TransactionId transactionId, boolean b) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void forgetTransaction(ConnectionContext connectionContext, TransactionId transactionId) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public BrokerInfo[] getPeerBrokerInfos() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void preProcessDispatch(MessageDispatch messageDispatch) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void postProcessDispatch(MessageDispatch messageDispatch) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public boolean isStopped() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public Set<ActiveMQDestination> getDurableDestinations() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void addDestinationInfo(ConnectionContext connectionContext, DestinationInfo destinationInfo) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void removeDestinationInfo(ConnectionContext connectionContext, DestinationInfo destinationInfo) throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public boolean isFaultTolerantConfiguration() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public ConnectionContext getAdminConnectionContext() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void setAdminConnectionContext(ConnectionContext connectionContext) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public PListStore getTempDataStore() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public URI getVmConnectorURI() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void brokerServiceStarted() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public BrokerService getBrokerService() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public Broker getRoot() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public boolean isExpired(MessageReference messageReference) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void messageExpired(ConnectionContext connectionContext, MessageReference messageReference, Subscription subscription) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public boolean sendToDeadLetterQueue(ConnectionContext connectionContext, MessageReference messageReference, Subscription subscription, Throwable throwable) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public long getBrokerSequenceId() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void messageConsumed(ConnectionContext connectionContext, MessageReference messageReference) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void messageDelivered(ConnectionContext connectionContext, MessageReference messageReference) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void messageDiscarded(ConnectionContext connectionContext, Subscription subscription, MessageReference messageReference) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void slowConsumer(ConnectionContext connectionContext, Destination destination, Subscription subscription) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void fastProducer(ConnectionContext connectionContext, ProducerInfo producerInfo, ActiveMQDestination activeMQDestination) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void isFull(ConnectionContext connectionContext, Destination destination, Usage usage) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void virtualDestinationAdded(ConnectionContext connectionContext, VirtualDestination virtualDestination) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void virtualDestinationRemoved(ConnectionContext connectionContext, VirtualDestination virtualDestination) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void nowMasterBroker() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public Scheduler getScheduler() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public ThreadPoolExecutor getExecutor() {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void networkBridgeStarted(BrokerInfo brokerInfo, boolean b, String s) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void networkBridgeStopped(BrokerInfo brokerInfo) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void start() throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public void stop() throws Exception {
throw new UnsupportedOperationException("Not implemented yet");
}
}