From 1b3d9cfed0a9b525bf8b1ddee47f21a050e96989 Mon Sep 17 00:00:00 2001 From: Jiri Danek Date: Mon, 26 Jun 2017 15:16:44 +0200 Subject: [PATCH] ARTEMIS-1276 fix JmsSendReceiveWithMessageExpirationTest --- pom.xml | 9 + tests/activemq5-unit-tests/pom.xml | 7 + .../apache/activemq/broker/BrokerService.java | 13 +- .../artemiswrapper/ArtemisBrokerWrapper.java | 1 + .../broker/artemiswrapper/RegionProxy.java | 157 +++++ .../region/policy/DestinationProxy.java | 645 ++++++++++++++++++ .../region/policy/RegionBrokerProxy.java | 423 ++++++++++++ 7 files changed, 1249 insertions(+), 6 deletions(-) create mode 100644 tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/RegionProxy.java create mode 100644 tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/DestinationProxy.java create mode 100644 tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/RegionBrokerProxy.java diff --git a/pom.xml b/pom.xml index 81f5196085..163d928a19 100644 --- a/pom.xml +++ b/pom.xml @@ -84,6 +84,7 @@ 9.4.3.v20170317 3.6.13.Final 2.4 + 2.8.47 4.1.9.Final 0.19.0 3.0.19.Final @@ -275,6 +276,14 @@ ${commons.collections.version} + + + org.mockito + mockito-core + ${mockito.version} + test + + diff --git a/tests/activemq5-unit-tests/pom.xml b/tests/activemq5-unit-tests/pom.xml index cd58170b56..f722cf6891 100644 --- a/tests/activemq5-unit-tests/pom.xml +++ b/tests/activemq5-unit-tests/pom.xml @@ -62,6 +62,13 @@ test-jar + + + org.mockito + mockito-core + compile + + org.apache.activemq activemq-client diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java index 58885d6d6f..d944bce371 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -46,6 +46,7 @@ import org.apache.activemq.broker.jmx.ManagementContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationInterceptor; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.RegionBrokerProxy; import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; @@ -250,14 +251,14 @@ public class BrokerService implements Service { } //below are methods called directly by tests - //we don't actually implement any of these for now, + //we don't actually implement many of these for now, //just to make test compile pass. - - //we may get class cast exception as in TestSupport it - //casts the broker to RegionBroker, which we didn't - //implement (wrap) yet. Consider solving it later. public Broker getRegionBroker() { - return broker; + try { + return RegionBrokerProxy.newRegionBroker((ArtemisBrokerWrapper) getBroker()); + } catch (Exception e) { + throw new RuntimeException(e); + } } public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException { diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java index 714aa55dc6..91498b83af 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java @@ -90,6 +90,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase { } SimpleString dla = new SimpleString("ActiveMQ.DLQ"); commonSettings.setDeadLetterAddress(dla); + commonSettings.setExpiryAddress(dla); commonSettings.setAutoCreateQueues(true); commonSettings.setAutoCreateAddresses(true); diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/RegionProxy.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/RegionProxy.java new file mode 100644 index 0000000000..b6e527e84d --- /dev/null +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/RegionProxy.java @@ -0,0 +1,157 @@ +package org.apache.activemq.broker.artemiswrapper; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ConsumerBrokerExchange; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.QueueRegion; +import org.apache.activemq.broker.region.Region; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.TopicRegion; +import org.apache.activemq.broker.region.policy.DestinationProxy; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConsumerControl; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatchNotification; +import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveSubscriptionInfo; +import org.apache.activemq.command.Response; +import org.mockito.AdditionalAnswers; +import org.mockito.Mockito; + +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class RegionProxy implements Region { + private final ActiveMQServer server; + private final RoutingType routingType; + + private RegionProxy(ActiveMQServer activeMQServer, RoutingType routingType) { + this.server = activeMQServer; + this.routingType = routingType; + } + + public static Region newQueueRegion(ActiveMQServer activeMQServer) { + return Mockito.mock(QueueRegion.class, AdditionalAnswers.delegatesTo(new RegionProxy(activeMQServer, RoutingType.ANYCAST))); + } + + public static Region newTopicRegion(ActiveMQServer activeMQServer) { + return Mockito.mock(TopicRegion.class, AdditionalAnswers.delegatesTo(new RegionProxy(activeMQServer, RoutingType.MULTICAST))); + } + + @Override + public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public Map getDestinationMap() { + return server.getPostOffice().getAllBindings().entrySet().stream() + .filter(e -> e.getValue() instanceof QueueBinding) + .filter(e -> { + final SimpleString address = ((QueueBinding) e.getValue()).getQueue().getAddress(); + return server.getAddressInfo(address).getRoutingType() == routingType; + } + ) + .collect(Collectors.toMap( + e -> { + final String uniqueName = e.getValue().getUniqueName().toString(); + return new ActiveMQQueue(uniqueName); + }, + e -> { + final Queue queue = ((QueueBinding) e.getValue()).getQueue(); + final String address = e.getValue().getAddress().toString(); + return new DestinationProxy(queue, address, server); + })); + } + + @Override + public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void gc() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public Set getDestinations(ActiveMQDestination destination) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void reapplyInterceptor() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void start() throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void stop() throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } +} diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/DestinationProxy.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/DestinationProxy.java new file mode 100644 index 0000000000..e5e8e726e7 --- /dev/null +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/DestinationProxy.java @@ -0,0 +1,645 @@ +package org.apache.activemq.broker.region.policy; + +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatchNotification; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.management.CountStatisticImpl; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.usage.MemoryUsage; +import org.apache.activemq.usage.Usage; +import org.apache.activemq.usage.UsageCapacity; +import org.apache.activemq.usage.UsageListener; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ThreadPoolExecutor; + +public class DestinationProxy implements Destination { + private final String name; + private final Queue view; + private final ActiveMQServer server; + + public DestinationProxy(Queue view, String name, ActiveMQServer server) { + this.view = view; + this.name = name; + this.server = server; + } + + // Destination + + @Override + public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public long getInactiveTimeoutBeforeGC() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void markForGC(long timeStamp) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public boolean canGC() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void gc() { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public ActiveMQDestination getActiveMQDestination() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public MemoryUsage getMemoryUsage() { + return new MemoryUsage() { + @Override + public void waitForSpace() throws InterruptedException { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public boolean waitForSpace(long timeout) throws InterruptedException { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public boolean isFull() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void enqueueUsage(long value) throws InterruptedException { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void increaseUsage(long value) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void decreaseUsage(long value) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + protected long retrieveUsage() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public long getUsage() { + try { + return server.getPagingManager().getPageStore(view.getAddress()).getAddressSize(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void setUsage(long usage) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setPercentOfJvmHeap(int percentOfJvmHeap) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public boolean waitForSpace(long timeout, int highWaterMark) throws InterruptedException { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public boolean isFull(int highWaterMark) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void addUsageListener(UsageListener listener) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void removeUsageListener(UsageListener listener) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public int getNumUsageListeners() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public long getLimit() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setLimit(long limit) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + protected void onLimitChange() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public float getUsagePortion() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setUsagePortion(float usagePortion) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public int getPercentUsage() { + long total = 0; + try { + total = server.getPagingManager().getPageStore(view.getAddress()).getMaxSize(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return (int) ((float) getUsage() / total * 100.0); + } + + @Override + protected void setPercentUsage(int value) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public int getPercentUsageMinDelta() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setPercentUsageMinDelta(int percentUsageMinDelta) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + protected int caclPercentUsage() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public String getName() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setName(String name) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public String toString() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void start() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void stop() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + protected void addChild(MemoryUsage child) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + protected void removeChild(MemoryUsage child) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public boolean notifyCallbackWhenNotFull(Runnable callback) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public UsageCapacity getLimiter() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setLimiter(UsageCapacity limiter) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public int getPollingTime() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setPollingTime(int pollingTime) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public MemoryUsage getParent() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setParent(MemoryUsage parent) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public ThreadPoolExecutor getExecutor() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setExecutor(ThreadPoolExecutor executor) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public boolean isStarted() { + throw new UnsupportedOperationException("Not implemented yet"); + } + }; + } + + @Override + public void setMemoryUsage(MemoryUsage memoryUsage) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public void dispose(ConnectionContext context) throws IOException { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public boolean isDisposed() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public DestinationStatistics getDestinationStatistics() { + return new DestinationStatistics() { + private CountStatisticImpl newFakeCountStatistic(Answer getCountFunction) { + CountStatisticImpl mock = Mockito.mock(CountStatisticImpl.class); + Mockito.doAnswer(getCountFunction).when(mock).getCount(); + return mock; + } + + @Override + public CountStatisticImpl getEnqueues() { + return newFakeCountStatistic(invocation -> view.getMessagesAdded()); + } + + @Override + public CountStatisticImpl getDequeues() { + return newFakeCountStatistic(invocation -> view.getMessagesAcknowledged()); + } + + @Override + public CountStatisticImpl getDispatched() { + return newFakeCountStatistic(invocation -> getDequeues().getCount() + getInflight().getCount()); + } + + @Override + public CountStatisticImpl getExpired() { + return newFakeCountStatistic(invocation -> view.getMessagesExpired()); + } + + @Override + public CountStatisticImpl getMessages() { + return newFakeCountStatistic(invocation -> view.getMessageCount()); + } + + @Override + public CountStatisticImpl getInflight() { + return newFakeCountStatistic(invocation -> (long) view.getDeliveringCount()); + } + }; + } + + @Override + public DeadLetterStrategy getDeadLetterStrategy() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public Message[] browse() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public String getName() { + return name; + } + + @Override + public MessageStore getMessageStore() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public boolean isProducerFlowControl() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setProducerFlowControl(boolean value) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public boolean isAlwaysRetroactive() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setAlwaysRetroactive(boolean value) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public long getBlockedProducerWarningInterval() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public int getMaxProducersToAudit() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setMaxProducersToAudit(int maxProducersToAudit) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public int getMaxAuditDepth() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setMaxAuditDepth(int maxAuditDepth) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public boolean isEnableAudit() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setEnableAudit(boolean enableAudit) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public boolean isActive() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public int getMaxPageSize() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setMaxPageSize(int maxPageSize) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public int getMaxBrowsePageSize() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setMaxBrowsePageSize(int maxPageSize) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public boolean isUseCache() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setUseCache(boolean useCache) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public int getMinimumMessageSize() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setMinimumMessageSize(int minimumMessageSize) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public int getCursorMemoryHighWaterMark() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public void wakeup() { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public boolean isLazyDispatch() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setLazyDispatch(boolean value) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public void messageExpired(ConnectionContext context, Subscription subs, MessageReference node) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public void messageConsumed(ConnectionContext context, MessageReference messageReference) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public void messageDelivered(ConnectionContext context, MessageReference messageReference) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public void slowConsumer(ConnectionContext context, Subscription subs) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public void isFull(ConnectionContext context, Usage usage) { + + } + + @Override + public List getConsumers() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public boolean isPrioritizedMessages() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public SlowConsumerStrategy getSlowConsumerStrategy() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public boolean isDoOptimzeMessageStorage() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public void clearPendingMessages() { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public void duplicateFromStore(Message message, Subscription subscription) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public void start() throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public void stop() throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public boolean iterate() { + throw new UnsupportedOperationException("Not implemented yet"); + } +} diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/RegionBrokerProxy.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/RegionBrokerProxy.java new file mode 100644 index 0000000000..e9faaf52d9 --- /dev/null +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/RegionBrokerProxy.java @@ -0,0 +1,423 @@ +package org.apache.activemq.broker.region.policy; + +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.Connection; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ConsumerBrokerExchange; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.artemiswrapper.ArtemisBrokerWrapper; +import org.apache.activemq.broker.artemiswrapper.RegionProxy; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.Region; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.BrokerId; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerControl; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.DestinationInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageDispatchNotification; +import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveSubscriptionInfo; +import org.apache.activemq.command.Response; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.store.PListStore; +import org.apache.activemq.thread.Scheduler; +import org.apache.activemq.usage.Usage; +import org.mockito.Mockito; + +import javax.management.MBeanServer; +import java.net.URI; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; + +import static org.mockito.AdditionalAnswers.delegatesTo; + +public class RegionBrokerProxy implements Broker { + + private final ActiveMQServer server; + private final MBeanServer mBeanServer; + + private RegionBrokerProxy(ArtemisBrokerWrapper wrapper) { + this.server = wrapper.getServer(); + this.mBeanServer = wrapper.getMbeanServer(); + } + + public static RegionBroker newRegionBroker(ArtemisBrokerWrapper broker) { + Broker brokerProxy = null; + try { + brokerProxy = new RegionBrokerProxy(broker); + RegionBroker regionBroker = Mockito.mock(RegionBroker.class, delegatesTo(brokerProxy)); + return regionBroker; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + // RegionBroker methods called by enabled tests + + public Region getTopicRegion() { + return RegionProxy.newTopicRegion(server); + } + + public Region getQueueRegion() { + return RegionProxy.newQueueRegion(server); + } + + //everything else, to satisfy the Broker interface + //we don't actually implement (wrap) many of these for now, + //just to make test compile pass. + @Override + public Broker getAdaptor(Class aClass) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public BrokerId getBrokerId() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public String getBrokerName() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void addBroker(Connection connection, BrokerInfo brokerInfo) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void removeBroker(Connection connection, BrokerInfo brokerInfo) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void addConnection(ConnectionContext connectionContext, ConnectionInfo connectionInfo) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void removeConnection(ConnectionContext connectionContext, ConnectionInfo connectionInfo, Throwable throwable) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void addSession(ConnectionContext connectionContext, SessionInfo sessionInfo) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void removeSession(ConnectionContext connectionContext, SessionInfo sessionInfo) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public Destination addDestination(ConnectionContext connectionContext, ActiveMQDestination activeMQDestination, boolean b) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void removeDestination(ConnectionContext connectionContext, ActiveMQDestination activeMQDestination, long l) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public Map getDestinationMap() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public Subscription addConsumer(ConnectionContext connectionContext, ConsumerInfo consumerInfo) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void removeConsumer(ConnectionContext connectionContext, ConsumerInfo consumerInfo) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void addProducer(ConnectionContext connectionContext, ProducerInfo producerInfo) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void removeProducer(ConnectionContext connectionContext, ProducerInfo producerInfo) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void removeSubscription(ConnectionContext connectionContext, RemoveSubscriptionInfo removeSubscriptionInfo) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void send(ProducerBrokerExchange producerBrokerExchange, Message message) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void acknowledge(ConsumerBrokerExchange consumerBrokerExchange, MessageAck messageAck) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public Response messagePull(ConnectionContext connectionContext, MessagePull messagePull) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void gc() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public Set getDestinations(ActiveMQDestination activeMQDestination) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void processConsumerControl(ConsumerBrokerExchange consumerBrokerExchange, ConsumerControl consumerControl) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void reapplyInterceptor() { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public Connection[] getClients() throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public ActiveMQDestination[] getDestinations() throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public Map getDestinationMap(ActiveMQDestination activeMQDestination) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public TransactionId[] getPreparedTransactions(ConnectionContext connectionContext) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void beginTransaction(ConnectionContext connectionContext, TransactionId transactionId) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public int prepareTransaction(ConnectionContext connectionContext, TransactionId transactionId) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void rollbackTransaction(ConnectionContext connectionContext, TransactionId transactionId) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void commitTransaction(ConnectionContext connectionContext, TransactionId transactionId, boolean b) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void forgetTransaction(ConnectionContext connectionContext, TransactionId transactionId) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public BrokerInfo[] getPeerBrokerInfos() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void preProcessDispatch(MessageDispatch messageDispatch) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void postProcessDispatch(MessageDispatch messageDispatch) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public boolean isStopped() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public Set getDurableDestinations() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void addDestinationInfo(ConnectionContext connectionContext, DestinationInfo destinationInfo) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void removeDestinationInfo(ConnectionContext connectionContext, DestinationInfo destinationInfo) throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public boolean isFaultTolerantConfiguration() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public ConnectionContext getAdminConnectionContext() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void setAdminConnectionContext(ConnectionContext connectionContext) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public PListStore getTempDataStore() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public URI getVmConnectorURI() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void brokerServiceStarted() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public BrokerService getBrokerService() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public Broker getRoot() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public boolean isExpired(MessageReference messageReference) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void messageExpired(ConnectionContext connectionContext, MessageReference messageReference, Subscription subscription) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public boolean sendToDeadLetterQueue(ConnectionContext connectionContext, MessageReference messageReference, Subscription subscription, Throwable throwable) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public long getBrokerSequenceId() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void messageConsumed(ConnectionContext connectionContext, MessageReference messageReference) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void messageDelivered(ConnectionContext connectionContext, MessageReference messageReference) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void messageDiscarded(ConnectionContext connectionContext, Subscription subscription, MessageReference messageReference) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void slowConsumer(ConnectionContext connectionContext, Destination destination, Subscription subscription) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void fastProducer(ConnectionContext connectionContext, ProducerInfo producerInfo, ActiveMQDestination activeMQDestination) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void isFull(ConnectionContext connectionContext, Destination destination, Usage usage) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void virtualDestinationAdded(ConnectionContext connectionContext, VirtualDestination virtualDestination) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void virtualDestinationRemoved(ConnectionContext connectionContext, VirtualDestination virtualDestination) { + throw new UnsupportedOperationException("Not implemented yet"); + + } + + @Override + public void nowMasterBroker() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public Scheduler getScheduler() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public ThreadPoolExecutor getExecutor() { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void networkBridgeStarted(BrokerInfo brokerInfo, boolean b, String s) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void networkBridgeStopped(BrokerInfo brokerInfo) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void start() throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void stop() throws Exception { + throw new UnsupportedOperationException("Not implemented yet"); + } +}