removing more dead code on openwire

(classes without any implementation detail, no code usage or duplicate meaning)
This commit is contained in:
Clebert Suconic 2015-08-28 19:32:46 -04:00
parent aecea5142d
commit 9cf4c18ea5
7 changed files with 12 additions and 338 deletions

View File

@ -1,110 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnector;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectorStatistics;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
public class AMQConnectorImpl implements AMQConnector {
private Acceptor acceptor;
public AMQConnectorImpl(Acceptor acceptorUsed) {
this.acceptor = acceptorUsed;
}
@Override
public BrokerInfo getBrokerInfo() {
// TODO Auto-generated method stub
return null;
}
@Override
public AMQConnectorStatistics getStatistics() {
// TODO Auto-generated method stub
return null;
}
@Override
public boolean isUpdateClusterClients() {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isRebalanceClusterClients() {
// TODO Auto-generated method stub
return false;
}
@Override
public void updateClientClusterInfo() {
// TODO Auto-generated method stub
}
@Override
public boolean isUpdateClusterClientsOnRemove() {
// TODO Auto-generated method stub
return false;
}
@Override
public int connectionCount() {
// TODO Auto-generated method stub
return 0;
}
@Override
public boolean isAllowLinkStealing() {
// TODO Auto-generated method stub
return true;
}
@Override
public ConnectionControl getConnectionControl() {
return new ConnectionControl();
}
@Override
public void onStarted(OpenWireConnection connection) {
// TODO Auto-generated method stub
}
@Override
public void onStopped(OpenWireConnection connection) {
// TODO Auto-generated method stub
}
public int getMaximumConsumersAllowedPerConnection() {
return 1000000;//this belongs to configuration, now hardcoded
}
public int getMaximumProducersAllowedPerConnection() {
return 1000000;//this belongs to configuration, now hardcoded
}
public boolean isAuditNetworkProducers() {
return false;
}
}

View File

@ -1,24 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire;
/**
* The class holds related states of an activemq broker.
*/
public class BrokerState {
}

View File

@ -106,8 +106,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
private final Connection transportConnection;
private final AMQConnectorImpl acceptorUsed;
private final long creationTime;
private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
@ -120,6 +118,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
private volatile boolean dataReceived;
private final Acceptor acceptorUsed;
private OpenWireFormat wireFormat;
private AMQConnectionContext context;
@ -155,7 +155,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
OpenWireFormat wf) {
this.protocolManager = openWireProtocolManager;
this.transportConnection = connection;
this.acceptorUsed = new AMQConnectorImpl(acceptorUsed);
this.acceptorUsed = acceptorUsed;
this.wireFormat = wf;
this.creationTime = System.currentTimeMillis();
}
@ -541,7 +541,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
context.setConnectionId(info.getConnectionId());
// for now we pass the manager as the connector and see what happens
// it should be related to activemq's Acceptor
context.setConnector(this.acceptorUsed);
context.setFaultTolerant(info.isFaultTolerant());
context.setUserName(info.getUserName());
context.setWireFormatInfo(wireFormatInfo);
@ -565,7 +564,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
}
if (info.isManageable()) {
// send ConnectionCommand
ConnectionControl command = this.acceptorUsed.getConnectionControl();
ConnectionControl command = new ConnectionControl();
command.setFaultTolerant(protocolManager.isFaultTolerantConfiguration());
if (info.isFailoverReconnect()) {
command.setRebalanceConnection(false);
@ -704,7 +703,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
}
protected void doStop() throws Exception {
this.acceptorUsed.onStopped(this);
/*
* What's a duplex bridge? try { synchronized (this) { if (duplexBridge !=
* null) { duplexBridge.stop(); } } } catch (Exception ignore) {
@ -1007,7 +1005,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
result = new AMQProducerBrokerExchange();
result.setConnectionContext(context);
//todo implement reconnect https://issues.apache.org/jira/browse/ARTEMIS-194
if (context.isReconnect() || (context.isNetworkConnection() && this.acceptorUsed.isAuditNetworkProducers())) {
//todo: this used to check for && this.acceptorUsed.isAuditNetworkProducers()
if (context.isReconnect() || (context.isNetworkConnection())) {
// once implemented ARTEMIS-194, we need to set the storedSequenceID here somehow
// We have different semantics on Artemis Journal, but we could adapt something for this
// TBD during the implemetnation of ARTEMIS-194
@ -1195,11 +1194,11 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
}
public int getMaximumConsumersAllowedPerConnection() {
return this.acceptorUsed.getMaximumConsumersAllowedPerConnection();
return 1000000;//this belongs to configuration, now hardcoded
}
public int getMaximumProducersAllowedPerConnection() {
return this.acceptorUsed.getMaximumProducersAllowedPerConnection();
return 1000000;//this belongs to configuration, now hardcoded
}
public void deliverMessage(MessageDispatch dispatch) {

View File

@ -113,8 +113,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
private boolean prefixPacketSize = true;
private BrokerState brokerState;
private BrokerId brokerId;
protected final ProducerId advisoryProducerId = new ProducerId();
@ -143,7 +141,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
this.wireFactory = new OpenWireFormatFactory();
// preferred prop, should be done via config
wireFactory.setCacheEnabled(false);
brokerState = new BrokerState();
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
ManagementService service = server.getManagementService();
scheduledPool = server.getScheduledPool();

View File

@ -30,7 +30,6 @@ import org.apache.activemq.state.ConnectionState;
public class AMQConnectionContext {
private OpenWireConnection connection;
private AMQConnector connector;
private OpenWireProtocolManager broker; //use protocol manager to represent the broker
private boolean inRecoveryMode;
private ConnectionId connectionId;
@ -67,7 +66,6 @@ public class AMQConnectionContext {
public AMQConnectionContext copy() {
AMQConnectionContext rc = new AMQConnectionContext(this.messageEvaluationContext);
rc.connection = this.connection;
rc.connector = this.connector;
rc.broker = this.broker;
rc.inRecoveryMode = this.inRecoveryMode;
rc.connectionId = this.connectionId;
@ -113,20 +111,6 @@ public class AMQConnectionContext {
this.connection = connection;
}
/**
* @return the connector being used.
*/
public AMQConnector getConnector() {
return connector;
}
/**
* @param connector being used.
*/
public void setConnector(AMQConnector connector) {
this.connector = connector;
}
/**
* @return
*/
@ -257,7 +241,10 @@ public class AMQConnectionContext {
}
public boolean isAllowLinkStealing() {
return connector != null && connector.isAllowLinkStealing();
// TODO: check what this means,
// on the activemq implementation this used to check on
// the connector, so this looks like a configuration option
return true;
}
}

View File

@ -1,74 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
public interface AMQConnector {
/**
* @return brokerInfo
*/
BrokerInfo getBrokerInfo();
/**
* @return the statistics for this connector
*/
AMQConnectorStatistics getStatistics();
/**
* @return true if update client connections when brokers leave/join a
* cluster
*/
boolean isUpdateClusterClients();
/**
* @return true if clients should be re-balanced across the cluster
*/
boolean isRebalanceClusterClients();
/**
* Update all the connections with information about the connected brokers in
* the cluster
*/
void updateClientClusterInfo();
/**
* @return true if clients should be updated when a broker is removed from a
* broker
*/
boolean isUpdateClusterClientsOnRemove();
int connectionCount();
/**
* If enabled, older connections with the same clientID are stopped
*
* @return true/false if link stealing is enabled
*/
boolean isAllowLinkStealing();
//see TransportConnector
ConnectionControl getConnectionControl();
void onStarted(OpenWireConnection connection);
void onStopped(OpenWireConnection connection);
}

View File

@ -1,101 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.management.PollCountStatisticImpl;
import org.apache.activemq.management.StatsImpl;
public class AMQConnectorStatistics extends StatsImpl {
protected CountStatisticImpl enqueues;
protected CountStatisticImpl dequeues;
protected CountStatisticImpl consumers;
protected CountStatisticImpl messages;
protected PollCountStatisticImpl messagesCached;
public AMQConnectorStatistics() {
enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been dispatched from the destination");
consumers = new CountStatisticImpl("consumers", "The number of consumers that that are subscribing to messages from the destination");
messages = new CountStatisticImpl("messages", "The number of messages that that are being held by the destination");
messagesCached = new PollCountStatisticImpl("messagesCached", "The number of messages that are held in the destination's memory cache");
addStatistic("enqueues", enqueues);
addStatistic("dequeues", dequeues);
addStatistic("consumers", consumers);
addStatistic("messages", messages);
addStatistic("messagesCached", messagesCached);
}
public CountStatisticImpl getEnqueues() {
return enqueues;
}
public CountStatisticImpl getDequeues() {
return dequeues;
}
public CountStatisticImpl getConsumers() {
return consumers;
}
public PollCountStatisticImpl getMessagesCached() {
return messagesCached;
}
public CountStatisticImpl getMessages() {
return messages;
}
public void reset() {
super.reset();
enqueues.reset();
dequeues.reset();
}
public void setEnabled(boolean enabled) {
super.setEnabled(enabled);
enqueues.setEnabled(enabled);
dequeues.setEnabled(enabled);
consumers.setEnabled(enabled);
messages.setEnabled(enabled);
messagesCached.setEnabled(enabled);
}
public void setParent(AMQConnectorStatistics parent) {
if (parent != null) {
enqueues.setParent(parent.enqueues);
dequeues.setParent(parent.dequeues);
consumers.setParent(parent.consumers);
messagesCached.setParent(parent.messagesCached);
messages.setParent(parent.messages);
}
else {
enqueues.setParent(null);
dequeues.setParent(null);
consumers.setParent(null);
messagesCached.setParent(null);
messages.setParent(null);
}
}
public void setMessagesCached(PollCountStatisticImpl messagesCached) {
this.messagesCached = messagesCached;
}
}