removing more dead code on openwire implementation

This commit is contained in:
Clebert Suconic 2015-08-28 14:11:11 -04:00
parent 3fbf75b2ff
commit 04ca86c3dd
6 changed files with 0 additions and 457 deletions

View File

@ -48,7 +48,6 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMessageAuthoriz
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransaction;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.security.SecurityAuth;
@ -565,7 +564,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
context.setConnector(this.acceptorUsed);
context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
context.setFaultTolerant(faultTolerantConnection);
context.setTransactions(new ConcurrentHashMap<TransactionId, AMQTransaction>());
context.setUserName(info.getUserName());
context.setWireFormatInfo(wireFormatInfo);
context.setReconnect(info.isFailoverReconnect());

View File

@ -1,96 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.command.Message;
public abstract class AMQAbstractDeadLetterStrategy implements AMQDeadLetterStrategy {
private boolean processNonPersistent = false;
private boolean processExpired = true;
private boolean enableAudit = true;
private final ActiveMQMessageAudit messageAudit = new ActiveMQMessageAudit();
@Override
public void rollback(Message message) {
if (message != null && this.enableAudit) {
messageAudit.rollback(message);
}
}
@Override
public boolean isSendToDeadLetterQueue(Message message) {
boolean result = false;
if (message != null) {
result = true;
if (enableAudit && messageAudit.isDuplicate(message)) {
result = false;
// LOG.debug("Not adding duplicate to DLQ: {}, dest: {}",
// message.getMessageId(), message.getDestination());
}
if (!message.isPersistent() && !processNonPersistent) {
result = false;
}
if (message.isExpired() && !processExpired) {
result = false;
}
}
return result;
}
/**
* @return the processExpired
*/
@Override
public boolean isProcessExpired() {
return this.processExpired;
}
/**
* @param processExpired the processExpired to set
*/
@Override
public void setProcessExpired(boolean processExpired) {
this.processExpired = processExpired;
}
/**
* @return the processNonPersistent
*/
@Override
public boolean isProcessNonPersistent() {
return this.processNonPersistent;
}
/**
* @param processNonPersistent the processNonPersistent to set
*/
@Override
public void setProcessNonPersistent(boolean processNonPersistent) {
this.processNonPersistent = processNonPersistent;
}
public boolean isEnableAudit() {
return enableAudit;
}
public void setEnableAudit(boolean enableAudit) {
this.enableAudit = enableAudit;
}
}

View File

@ -37,8 +37,6 @@ public class AMQConnectionContext {
private AMQConnector connector;
private OpenWireProtocolManager broker; //use protocol manager to represent the broker
private boolean inRecoveryMode;
private AMQTransaction transaction;
private ConcurrentMap<TransactionId, AMQTransaction> transactions;
private AMQSecurityContext securityContext;
private ConnectionId connectionId;
private String clientId;
@ -78,8 +76,6 @@ public class AMQConnectionContext {
rc.connector = this.connector;
rc.broker = this.broker;
rc.inRecoveryMode = this.inRecoveryMode;
rc.transaction = this.transaction;
rc.transactions = this.transactions;
rc.securityContext = this.securityContext;
rc.connectionId = this.connectionId;
rc.clientId = this.clientId;
@ -139,20 +135,6 @@ public class AMQConnectionContext {
this.connection = connection;
}
/**
* @return the transaction being used.
*/
public AMQTransaction getTransaction() {
return transaction;
}
/**
* @param transaction being used.
*/
public void setTransaction(AMQTransaction transaction) {
this.transaction = transaction;
}
/**
* @return the connector being used.
*/
@ -190,18 +172,6 @@ public class AMQConnectionContext {
this.inRecoveryMode = inRecoveryMode;
}
public ConcurrentMap<TransactionId, AMQTransaction> getTransactions() {
return transactions;
}
public void setTransactions(ConcurrentMap<TransactionId, AMQTransaction> transactions) {
this.transactions = transactions;
}
public boolean isInTransaction() {
return transaction != null;
}
public String getClientId() {
return clientId;
}

View File

@ -1,69 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
public interface AMQDeadLetterStrategy {
/**
* Allow pluggable strategy for deciding if message should be sent to a dead letter queue
* for example, you might not want to ignore expired or non-persistent messages
*
* @param message
* @return true if message should be sent to a dead letter queue
*/
boolean isSendToDeadLetterQueue(Message message);
/**
* Returns the dead letter queue for the given message and subscription.
*/
ActiveMQDestination getDeadLetterQueueFor(Message message, AMQSubscription subscription);
/**
* @return true if processes expired messages
*/
boolean isProcessExpired();
/**
* @param processExpired the processExpired to set
*/
void setProcessExpired(boolean processExpired);
/**
* @return the processNonPersistent
*/
boolean isProcessNonPersistent();
/**
* @param processNonPersistent the processNonPersistent to set
*/
void setProcessNonPersistent(boolean processNonPersistent);
boolean isDLQ(ActiveMQDestination destination);
/**
* Allows for a Message that was already processed by a DLQ to be rolled back in case
* of a move or a retry of that message, otherwise the Message would be considered a
* duplicate if this strategy is doing Message Auditing.
*
* @param message
*/
void rollback(Message message);
}

View File

@ -1,51 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Message;
public class AMQSharedDeadLetterStrategy extends AMQAbstractDeadLetterStrategy {
public static final String DEFAULT_DEAD_LETTER_QUEUE_NAME = "ActiveMQ.DLQ";
private ActiveMQDestination deadLetterQueue = new ActiveMQQueue(DEFAULT_DEAD_LETTER_QUEUE_NAME);
public ActiveMQDestination getDeadLetterQueueFor(Message message, AMQSubscription subscription) {
return deadLetterQueue;
}
public ActiveMQDestination getDeadLetterQueue() {
return deadLetterQueue;
}
public void setDeadLetterQueue(ActiveMQDestination deadLetterQueue) {
this.deadLetterQueue = deadLetterQueue;
}
@Override
public boolean isDLQ(ActiveMQDestination destination) {
if (destination.equals(deadLetterQueue)) {
return true;
}
else {
return false;
}
}
}

View File

@ -1,209 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import javax.transaction.xa.XAException;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.transaction.Synchronization;
import org.slf4j.Logger;
public abstract class AMQTransaction {
public static final byte START_STATE = 0; // can go to: 1,2,3
public static final byte IN_USE_STATE = 1; // can go to: 2,3
public static final byte PREPARED_STATE = 2; // can go to: 3
public static final byte FINISHED_STATE = 3;
boolean committed = false;
private final ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>();
private byte state = START_STATE;
protected FutureTask<?> preCommitTask = new FutureTask<Object>(new Callable<Object>() {
public Object call() throws Exception {
doPreCommit();
return null;
}
});
protected FutureTask<?> postCommitTask = new FutureTask<Object>(new Callable<Object>() {
public Object call() throws Exception {
doPostCommit();
return null;
}
});
public byte getState() {
return state;
}
public void setState(byte state) {
this.state = state;
}
public boolean isCommitted() {
return committed;
}
public void setCommitted(boolean committed) {
this.committed = committed;
}
public void addSynchronization(Synchronization r) {
synchronizations.add(r);
if (state == START_STATE) {
state = IN_USE_STATE;
}
}
public Synchronization findMatching(Synchronization r) {
int existing = synchronizations.indexOf(r);
if (existing != -1) {
return synchronizations.get(existing);
}
return null;
}
public void removeSynchronization(Synchronization r) {
synchronizations.remove(r);
}
public void prePrepare() throws Exception {
// Is it ok to call prepare now given the state of the
// transaction?
switch (state) {
case START_STATE:
case IN_USE_STATE:
break;
default:
XAException xae = new XAException("Prepare cannot be called now.");
xae.errorCode = XAException.XAER_PROTO;
throw xae;
}
// // Run the prePrepareTasks
// for (Iterator iter = prePrepareTasks.iterator(); iter.hasNext();) {
// Callback r = (Callback) iter.next();
// r.execute();
// }
}
protected void fireBeforeCommit() throws Exception {
for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext(); ) {
Synchronization s = iter.next();
s.beforeCommit();
}
}
protected void fireAfterCommit() throws Exception {
for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext(); ) {
Synchronization s = iter.next();
s.afterCommit();
}
}
public void fireAfterRollback() throws Exception {
Collections.reverse(synchronizations);
for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext(); ) {
Synchronization s = iter.next();
s.afterRollback();
}
}
@Override
public String toString() {
return "Local-" + getTransactionId() + "[synchronizations=" + synchronizations + "]";
}
public abstract void commit(boolean onePhase) throws XAException, IOException;
public abstract void rollback() throws XAException, IOException;
public abstract int prepare() throws XAException, IOException;
public abstract TransactionId getTransactionId();
public abstract Logger getLog();
public boolean isPrepared() {
return getState() == PREPARED_STATE;
}
public int size() {
return synchronizations.size();
}
protected void waitPostCommitDone(FutureTask<?> postCommitTask) throws XAException, IOException {
try {
postCommitTask.get();
}
catch (InterruptedException e) {
throw new InterruptedIOException(e.toString());
}
catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof XAException) {
throw (XAException) t;
}
else if (t instanceof IOException) {
throw (IOException) t;
}
else {
throw new XAException(e.toString());
}
}
}
protected void doPreCommit() throws XAException {
try {
fireBeforeCommit();
}
catch (Throwable e) {
// I guess this could happen. Post commit task failed
// to execute properly.
getLog().warn("PRE COMMIT FAILED: ", e);
XAException xae = new XAException("PRE COMMIT FAILED");
xae.errorCode = XAException.XAER_RMFAIL;
xae.initCause(e);
throw xae;
}
}
protected void doPostCommit() throws XAException {
try {
setCommitted(true);
fireAfterCommit();
}
catch (Throwable e) {
// I guess this could happen. Post commit task failed
// to execute properly.
getLog().warn("POST COMMIT FAILED: ", e);
XAException xae = new XAException("POST COMMIT FAILED");
xae.errorCode = XAException.XAER_RMFAIL;
xae.initCause(e);
throw xae;
}
}
}