From dad506decbc27f54852efd52c19120f788a842d2 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Fri, 5 Sep 2008 14:10:36 +0000 Subject: [PATCH] Patch applied for https://issues.apache.org/activemq/browse/AMQ-1892 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@692449 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/plugin/DiscardingDLQBroker.java | 157 ++++++++++++++++++ .../plugin/DiscardingDLQBrokerPlugin.java | 114 +++++++++++++ 2 files changed, 271 insertions(+) create mode 100644 activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBrokerPlugin.java diff --git a/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java b/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java new file mode 100644 index 0000000000..b9ec3fd13d --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.activemq.plugin; + +import java.io.IOException; +import java.util.regex.Pattern; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerFilter; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * @author Filip Hanik + * @version 1.0 + */ +public class DiscardingDLQBroker extends BrokerFilter { + public static Log log = LogFactory.getLog(DiscardingDLQBroker.class); + private boolean dropTemporaryTopics = true; + private boolean dropTemporaryQueues = true; + private boolean dropAll = true; + private Pattern[] destFilter; + private int reportInterval = 1000; + private long dropCount = 0; + + public DiscardingDLQBroker(Broker next) { + super(next); + } + + @Override + public void sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef) { + if (log.isTraceEnabled()) { + try { + log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + (msgRef != null ? msgRef.getMessage() : null)); + } catch (IOException x) { + log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + msgRef != null ? msgRef : null, x); + } + } + boolean dropped = true; + Message msg = null; + ActiveMQDestination dest = null; + String destName = null; + try { + msg = msgRef.getMessage(); + dest = msg.getDestination(); + destName = dest.getPhysicalName(); + }catch (IOException x) { + if (log.isDebugEnabled()) { + log.debug("Unable to retrieve message or destination for message going to Dead Letter Queue. message skipped.", x); + } + } + + if (dest == null || destName == null ) { + //do nothing, no need to forward it + skipMessage("NULL DESTINATION",msgRef); + } else if (dropAll) { + //do nothing + skipMessage("dropAll",msgRef); + } else if (dropTemporaryTopics && dest.isTemporary() && dest.isTopic()) { + //do nothing + skipMessage("dropTemporaryTopics",msgRef); + } else if (dropTemporaryQueues && dest.isTemporary() && dest.isQueue()) { + //do nothing + skipMessage("dropTemporaryQueues",msgRef); + } else if (destFilter!=null && matches(destName)) { + //do nothing + skipMessage("dropOnly",msgRef); + } else { + dropped = false; + next.sendToDeadLetterQueue(ctx, msgRef); + } + if (dropped && getReportInterval()>0) { + if ((++dropCount)%getReportInterval() == 0 ) { + log.info("Total of "+dropCount+" messages were discarded, since their destination was the dead letter queue"); + } + } + } + + public boolean matches(String destName) { + for (int i=0; destFilter!=null && i list = new ArrayList(); + StringTokenizer t = new StringTokenizer(getDropOnly()," "); + while (t.hasMoreTokens()) { + String s = t.nextToken(); + if (s!=null && s.trim().length()>0) list.add(Pattern.compile(s)); + } + if (list.size()==0) return null; + return list.toArray(new Pattern[0]); + } +}