Removing dead code on openwire - DestinationStatistics & AuthorizationPolicy
This commit is contained in:
parent
5f16adde43
commit
ebb91edd42
|
@ -43,7 +43,6 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsum
|
|||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMessageAuthorizationPolicy;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange;
|
||||
|
@ -129,8 +128,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
|
|||
|
||||
private AMQConnectionContext context;
|
||||
|
||||
private AMQMessageAuthorizationPolicy messageAuthorizationPolicy;
|
||||
|
||||
private boolean manageable;
|
||||
|
||||
private boolean pendingStop;
|
||||
|
@ -561,7 +558,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
|
|||
// for now we pass the manager as the connector and see what happens
|
||||
// it should be related to activemq's Acceptor
|
||||
context.setConnector(this.acceptorUsed);
|
||||
context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
|
||||
context.setFaultTolerant(faultTolerantConnection);
|
||||
context.setUserName(info.getUserName());
|
||||
context.setWireFormatInfo(wireFormatInfo);
|
||||
|
@ -703,10 +699,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
|
|||
}
|
||||
}
|
||||
|
||||
private AMQMessageAuthorizationPolicy getMessageAuthorizationPolicy() {
|
||||
return this.messageAuthorizationPolicy;
|
||||
}
|
||||
|
||||
public void delayedStop(final int waitTime, final String reason, Throwable cause) {
|
||||
if (waitTime > 0) {
|
||||
synchronized (this) {
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import org.apache.activemq.broker.region.MessageReference;
|
||||
import org.apache.activemq.command.ConnectionId;
|
||||
import org.apache.activemq.command.ConnectionInfo;
|
||||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.activemq.command.WireFormatInfo;
|
||||
import org.apache.activemq.command.XATransactionId;
|
||||
import org.apache.activemq.filter.MessageEvaluationContext;
|
||||
|
@ -45,7 +44,6 @@ public class AMQConnectionContext {
|
|||
private WireFormatInfo wireFormatInfo;
|
||||
private Object longTermStoreContext;
|
||||
private boolean producerFlowControl = true;
|
||||
private AMQMessageAuthorizationPolicy messageAuthorizationPolicy;
|
||||
private boolean networkConnection;
|
||||
private boolean faultTolerant;
|
||||
private final AtomicBoolean stopping = new AtomicBoolean();
|
||||
|
@ -84,7 +82,6 @@ public class AMQConnectionContext {
|
|||
rc.wireFormatInfo = this.wireFormatInfo;
|
||||
rc.longTermStoreContext = this.longTermStoreContext;
|
||||
rc.producerFlowControl = this.producerFlowControl;
|
||||
rc.messageAuthorizationPolicy = this.messageAuthorizationPolicy;
|
||||
rc.networkConnection = this.networkConnection;
|
||||
rc.faultTolerant = this.faultTolerant;
|
||||
rc.stopping.set(this.stopping.get());
|
||||
|
@ -149,18 +146,6 @@ public class AMQConnectionContext {
|
|||
this.connector = connector;
|
||||
}
|
||||
|
||||
public AMQMessageAuthorizationPolicy getMessageAuthorizationPolicy() {
|
||||
return messageAuthorizationPolicy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the policy used to decide if the current connection is authorized to
|
||||
* consume a given message
|
||||
*/
|
||||
public void setMessageAuthorizationPolicy(AMQMessageAuthorizationPolicy messageAuthorizationPolicy) {
|
||||
this.messageAuthorizationPolicy = messageAuthorizationPolicy;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return
|
||||
*/
|
||||
|
@ -232,13 +217,6 @@ public class AMQConnectionContext {
|
|||
this.producerFlowControl = disableProducerFlowControl;
|
||||
}
|
||||
|
||||
public boolean isAllowedToConsume(MessageReference n) throws IOException {
|
||||
if (messageAuthorizationPolicy != null) {
|
||||
return messageAuthorizationPolicy.isAllowedToConsume(this, n.getMessage());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public synchronized boolean isNetworkConnection() {
|
||||
return networkConnection;
|
||||
}
|
||||
|
|
|
@ -1,196 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.openwire.amq;
|
||||
|
||||
import org.apache.activemq.management.CountStatisticImpl;
|
||||
import org.apache.activemq.management.PollCountStatisticImpl;
|
||||
import org.apache.activemq.management.SizeStatisticImpl;
|
||||
import org.apache.activemq.management.StatsImpl;
|
||||
import org.apache.activemq.management.TimeStatisticImpl;
|
||||
|
||||
public class AMQDestinationStatistics extends StatsImpl {
|
||||
|
||||
protected CountStatisticImpl enqueues;
|
||||
protected CountStatisticImpl dequeues;
|
||||
protected CountStatisticImpl consumers;
|
||||
protected CountStatisticImpl producers;
|
||||
protected CountStatisticImpl messages;
|
||||
protected PollCountStatisticImpl messagesCached;
|
||||
protected CountStatisticImpl dispatched;
|
||||
protected CountStatisticImpl inflight;
|
||||
protected CountStatisticImpl expired;
|
||||
protected TimeStatisticImpl processTime;
|
||||
protected CountStatisticImpl blockedSends;
|
||||
protected TimeStatisticImpl blockedTime;
|
||||
protected SizeStatisticImpl messageSize;
|
||||
|
||||
public AMQDestinationStatistics() {
|
||||
|
||||
enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
|
||||
dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the destination");
|
||||
dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the destination");
|
||||
inflight = new CountStatisticImpl("inflight", "The number of messages dispatched but awaiting acknowledgement");
|
||||
expired = new CountStatisticImpl("expired", "The number of messages that have expired");
|
||||
|
||||
consumers = new CountStatisticImpl("consumers", "The number of consumers that that are subscribing to messages from the destination");
|
||||
consumers.setDoReset(false);
|
||||
producers = new CountStatisticImpl("producers", "The number of producers that that are publishing messages to the destination");
|
||||
producers.setDoReset(false);
|
||||
messages = new CountStatisticImpl("messages", "The number of messages that that are being held by the destination");
|
||||
messages.setDoReset(false);
|
||||
messagesCached = new PollCountStatisticImpl("messagesCached", "The number of messages that are held in the destination's memory cache");
|
||||
processTime = new TimeStatisticImpl("processTime", "information around length of time messages are held by a destination");
|
||||
blockedSends = new CountStatisticImpl("blockedSends", "number of messages that have to wait for flow control");
|
||||
blockedTime = new TimeStatisticImpl("blockedTime", "amount of time messages are blocked for flow control");
|
||||
messageSize = new SizeStatisticImpl("messageSize", "Size of messages passing through the destination");
|
||||
addStatistic("enqueues", enqueues);
|
||||
addStatistic("dispatched", dispatched);
|
||||
addStatistic("dequeues", dequeues);
|
||||
addStatistic("inflight", inflight);
|
||||
addStatistic("expired", expired);
|
||||
addStatistic("consumers", consumers);
|
||||
addStatistic("producers", producers);
|
||||
addStatistic("messages", messages);
|
||||
addStatistic("messagesCached", messagesCached);
|
||||
addStatistic("processTime", processTime);
|
||||
addStatistic("blockedSends", blockedSends);
|
||||
addStatistic("blockedTime", blockedTime);
|
||||
addStatistic("messageSize", messageSize);
|
||||
}
|
||||
|
||||
public CountStatisticImpl getEnqueues() {
|
||||
return enqueues;
|
||||
}
|
||||
|
||||
public CountStatisticImpl getDequeues() {
|
||||
return dequeues;
|
||||
}
|
||||
|
||||
public CountStatisticImpl getInflight() {
|
||||
return inflight;
|
||||
}
|
||||
|
||||
public CountStatisticImpl getExpired() {
|
||||
return expired;
|
||||
}
|
||||
|
||||
public CountStatisticImpl getConsumers() {
|
||||
return consumers;
|
||||
}
|
||||
|
||||
public CountStatisticImpl getProducers() {
|
||||
return producers;
|
||||
}
|
||||
|
||||
public PollCountStatisticImpl getMessagesCached() {
|
||||
return messagesCached;
|
||||
}
|
||||
|
||||
public CountStatisticImpl getMessages() {
|
||||
return messages;
|
||||
}
|
||||
|
||||
public void setMessagesCached(PollCountStatisticImpl messagesCached) {
|
||||
this.messagesCached = messagesCached;
|
||||
}
|
||||
|
||||
public CountStatisticImpl getDispatched() {
|
||||
return dispatched;
|
||||
}
|
||||
|
||||
public TimeStatisticImpl getProcessTime() {
|
||||
return this.processTime;
|
||||
}
|
||||
|
||||
public CountStatisticImpl getBlockedSends() {
|
||||
return this.blockedSends;
|
||||
}
|
||||
|
||||
public TimeStatisticImpl getBlockedTime() {
|
||||
return this.blockedTime;
|
||||
}
|
||||
|
||||
public SizeStatisticImpl getMessageSize() {
|
||||
return this.messageSize;
|
||||
}
|
||||
|
||||
public void reset() {
|
||||
if (this.isDoReset()) {
|
||||
super.reset();
|
||||
enqueues.reset();
|
||||
dequeues.reset();
|
||||
dispatched.reset();
|
||||
inflight.reset();
|
||||
expired.reset();
|
||||
blockedSends.reset();
|
||||
blockedTime.reset();
|
||||
messageSize.reset();
|
||||
}
|
||||
}
|
||||
|
||||
public void setEnabled(boolean enabled) {
|
||||
super.setEnabled(enabled);
|
||||
enqueues.setEnabled(enabled);
|
||||
dispatched.setEnabled(enabled);
|
||||
dequeues.setEnabled(enabled);
|
||||
inflight.setEnabled(enabled);
|
||||
expired.setEnabled(true);
|
||||
consumers.setEnabled(enabled);
|
||||
producers.setEnabled(enabled);
|
||||
messages.setEnabled(enabled);
|
||||
messagesCached.setEnabled(enabled);
|
||||
processTime.setEnabled(enabled);
|
||||
blockedSends.setEnabled(enabled);
|
||||
blockedTime.setEnabled(enabled);
|
||||
messageSize.setEnabled(enabled);
|
||||
|
||||
}
|
||||
|
||||
public void setParent(AMQDestinationStatistics parent) {
|
||||
if (parent != null) {
|
||||
enqueues.setParent(parent.enqueues);
|
||||
dispatched.setParent(parent.dispatched);
|
||||
dequeues.setParent(parent.dequeues);
|
||||
inflight.setParent(parent.inflight);
|
||||
expired.setParent(parent.expired);
|
||||
consumers.setParent(parent.consumers);
|
||||
producers.setParent(parent.producers);
|
||||
messagesCached.setParent(parent.messagesCached);
|
||||
messages.setParent(parent.messages);
|
||||
processTime.setParent(parent.processTime);
|
||||
blockedSends.setParent(parent.blockedSends);
|
||||
blockedTime.setParent(parent.blockedTime);
|
||||
messageSize.setParent(parent.messageSize);
|
||||
}
|
||||
else {
|
||||
enqueues.setParent(null);
|
||||
dispatched.setParent(null);
|
||||
dequeues.setParent(null);
|
||||
inflight.setParent(null);
|
||||
expired.setParent(null);
|
||||
consumers.setParent(null);
|
||||
producers.setParent(null);
|
||||
messagesCached.setParent(null);
|
||||
messages.setParent(null);
|
||||
processTime.setParent(null);
|
||||
blockedSends.setParent(null);
|
||||
blockedTime.setParent(null);
|
||||
messageSize.setParent(null);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,31 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.openwire.amq;
|
||||
|
||||
import org.apache.activemq.command.Message;
|
||||
|
||||
public interface AMQMessageAuthorizationPolicy {
|
||||
|
||||
/**
|
||||
* Returns true if the given message is able to be dispatched to the connection
|
||||
* performing any user
|
||||
*
|
||||
* @return true if the context is allowed to consume the message
|
||||
*/
|
||||
boolean isAllowedToConsume(AMQConnectionContext context, Message message);
|
||||
|
||||
}
|
Loading…
Reference in New Issue