From dd1d6601492e3d16a22e7de3d3762f0663dc83e3 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Fri, 29 Jun 2007 14:07:46 +0000 Subject: [PATCH] Added dup detection capability git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@551902 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/ActiveMQMessageAudit.java | 130 +++++++++++++++ .../org/apache/activemq/util/BitArray.java | 152 ++++++++++++++++++ .../org/apache/activemq/util/BitArrayBin.java | 139 ++++++++++++++++ .../org/apache/activemq/util/IdGenerator.java | 60 +++++++ 4 files changed, 481 insertions(+) create mode 100755 activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java create mode 100755 activemq-core/src/main/java/org/apache/activemq/util/BitArray.java create mode 100755 activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java new file mode 100755 index 0000000000..a77e6131b8 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java @@ -0,0 +1,130 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.activemq; + +import java.util.LinkedHashMap; +import javax.jms.JMSException; +import javax.jms.Message; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.util.BitArrayBin; +import org.apache.activemq.util.IdGenerator; +import org.apache.activemq.util.LRUCache; + +/** + * Provides basic audit functions for Messages + * + * @version $Revision: 1.1.1.1 $ + */ +public class ActiveMQMessageAudit{ + + private static final int DEFAULT_WINDOW_SIZE=1024; + private static final int MAXIMUM_PRODUCER_COUNT=128; + private int windowSize; + private LinkedHashMap map; + + /** + * Default Constructor windowSize = 1024, maximumNumberOfProducersToTrack = 128 + */ + public ActiveMQMessageAudit(){ + this(DEFAULT_WINDOW_SIZE,MAXIMUM_PRODUCER_COUNT); + } + + /** + * Construct a MessageAudit + * + * @param windowSize range of ids to track + * @param maximumNumberOfProducersToTrack number of producers expected in the system + */ + public ActiveMQMessageAudit(int windowSize,final int maximumNumberOfProducersToTrack){ + this.windowSize=windowSize; + map=new LRUCache(maximumNumberOfProducersToTrack,maximumNumberOfProducersToTrack,0.75f,true); + } + + /** + * Checks if this message has beeb seen before + * + * @param message + * @return true if the message is a duplicate + * @throws JMSException + */ + public boolean isDuplicateMessage(Message message) throws JMSException{ + return isDuplicate(message.getJMSMessageID()); + } + + /** + * checks whether this messageId has been seen before and adds this messageId to the list + * + * @param id + * @return true if the message is a duplicate + */ + public synchronized boolean isDuplicate(String id){ + boolean answer=false; + String seed=IdGenerator.getSeedFromId(id); + if(seed!=null){ + BitArrayBin bab=map.get(seed); + if(bab==null){ + bab=new BitArrayBin(windowSize); + map.put(seed,bab); + } + long index=IdGenerator.getSequenceFromId(id); + if(index>=0){ + answer=bab.setBit(index,true); + } + } + return answer; + } + + /** + * Checks if this message has beeb seen before + * + * @param message + * @return true if the message is a duplicate + */ + public synchronized boolean isDuplicateMessageReference(final MessageReference message){ + boolean answer=false; + MessageId id=message.getMessageId(); + if(id!=null){ + ProducerId pid=id.getProducerId(); + if(pid!=null){ + BitArrayBin bab=map.get(pid); + if(bab==null){ + bab=new BitArrayBin(windowSize); + map.put(pid,bab); + } + answer=bab.setBit(id.getProducerSequenceId(),true); + } + } + return answer; + } + + /** + * uun mark this messager as being received + * @param message + */ + public synchronized void rollbackMessageReference(final MessageReference message){ + MessageId id=message.getMessageId(); + if(id!=null){ + ProducerId pid=id.getProducerId(); + if(pid!=null){ + BitArrayBin bab=map.get(pid); + if(bab!=null){ + bab.setBit(id.getProducerSequenceId(),false); + } + } + } + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java b/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java new file mode 100755 index 0000000000..832d6ba495 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java @@ -0,0 +1,152 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.util; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Simple BitArray to enable setting multiple boolean values efficently Used instead of BitSet because BitSet does not + * allow for efficent serialization. + * Will store up to 64 boolean values + * + * @version $Revision: 1.1.1.1 $ + */ +public class BitArray { + static final int LONG_SIZE = 64; + static final int INT_SIZE = 32; + static final int SHORT_SIZE = 16; + static final int BYTE_SIZE = 8; + private static final long[] BIT_VALUES = {0x0000000000000001L, 0x0000000000000002L, 0x0000000000000004L, + 0x0000000000000008L, 0x0000000000000010L, 0x0000000000000020L, 0x0000000000000040L, 0x0000000000000080L, + 0x0000000000000100L, 0x0000000000000200L, 0x0000000000000400L, 0x0000000000000800L, 0x0000000000001000L, + 0x0000000000002000L, 0x0000000000004000L, 0x0000000000008000L, 0x0000000000010000L, 0x0000000000020000L, + 0x0000000000040000L, 0x0000000000080000L, 0x0000000000100000L, 0x0000000000200000L, 0x0000000000400000L, + 0x0000000000800000L, 0x0000000001000000L, 0x0000000002000000L, 0x0000000004000000L, 0x0000000008000000L, + 0x0000000010000000L, 0x0000000020000000L, 0x0000000040000000L, 0x0000000080000000L, 0x0000000100000000L, + 0x0000000200000000L, 0x0000000400000000L, 0x0000000800000000L, 0x0000001000000000L, 0x0000002000000000L, + 0x0000004000000000L, 0x0000008000000000L, 0x0000010000000000L, 0x0000020000000000L, 0x0000040000000000L, + 0x0000080000000000L, 0x0000100000000000L, 0x0000200000000000L, 0x0000400000000000L, 0x0000800000000000L, + 0x0001000000000000L, 0x0002000000000000L, 0x0004000000000000L, 0x0008000000000000L, 0x0010000000000000L, + 0x0020000000000000L, 0x0040000000000000L, 0x0080000000000000L, 0x0100000000000000L, 0x0200000000000000L, + 0x0400000000000000L, 0x0800000000000000L, 0x1000000000000000L, 0x2000000000000000L, 0x4000000000000000L, + 0x8000000000000000L}; + private long bits; + private int length; + + /** + * @return the length of bits set + */ + public int length() { + return length; + } + + /** + * @return the long containing the bits + */ + public long getBits() { + return bits; + } + + /** + * set the boolean value at the index + * + * @param index + * @param flag + * @return the old value held at this index + */ + public boolean set(int index, boolean flag) { + length = Math.max(length, index + 1); + boolean oldValue = (bits & BIT_VALUES[index]) != 0; + if (flag) { + bits |= BIT_VALUES[index]; + } + else if (oldValue) { + bits &= ~(BIT_VALUES[index]); + } + return oldValue; + } + + /** + * @param index + * @return the boolean value at this index + */ + public boolean get(int index) { + return (bits & BIT_VALUES[index]) != 0; + } + + /** + * reset all the bit values to false + */ + public void reset(){ + bits = 0; + } + + /** + * reset all the bits to the value supplied + * @param bits + */ + public void reset(long bits){ + this.bits = bits; + } + + /** + * write the bits to an output stream + * + * @param dataOut + * @throws IOException + */ + public void writeToStream(DataOutput dataOut) throws IOException { + dataOut.writeByte(length); + if (length <= BYTE_SIZE) { + dataOut.writeByte((int) bits); + } + else if (length <= SHORT_SIZE) { + dataOut.writeShort((short) bits); + } + else if (length <= INT_SIZE) { + dataOut.writeInt((int) bits); + } + else { + dataOut.writeLong(bits); + } + } + + /** + * read the bits from an input stream + * + * @param dataIn + * @throws IOException + */ + public void readFromStream(DataInput dataIn) throws IOException { + length = dataIn.readByte(); + if (length <= BYTE_SIZE) { + bits = dataIn.readByte(); + } + else if (length <= SHORT_SIZE) { + bits = dataIn.readShort(); + } + else if (length <= INT_SIZE) { + bits=dataIn.readInt(); + } + else { + bits = dataIn.readLong(); + } + } +} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java b/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java new file mode 100755 index 0000000000..b998222122 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java @@ -0,0 +1,139 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.activemq.util; + +import java.util.LinkedList; + +/** + * Holder for many bitArrays - used for message audit + * + * @version $Revision: 1.1.1.1 $ + */ +public class BitArrayBin{ + + private LinkedList list; + private int maxNumberOfArrays; + private int firstIndex=-1; + private int firstBin=-1; + + /** + * Create a BitArrayBin to a certain window size (number of messages to keep) + * + * @param windowSize + */ + public BitArrayBin(int windowSize){ + maxNumberOfArrays=((windowSize+1)/BitArray.LONG_SIZE)+1; + maxNumberOfArrays=Math.max(maxNumberOfArrays,1); + list=new LinkedList(); + for(int i=0;i=0){ + answer=ba.set(offset,value); + } + } + return answer; + } + + /** + * Get the boolean value at the index + * + * @param index + * @return true/false + */ + public boolean getBit(long index){ + boolean answer=index>=firstIndex; + BitArray ba=getBitArray(index); + if(ba!=null){ + int offset=getOffset(index); + if(offset>=0){ + answer=ba.get(offset); + return answer; + } + }else{ + // gone passed range for previous bins so assume set + answer=true; + } + return answer; + } + + /** + * Get the BitArray for the index + * + * @param index + * @return BitArray + */ + private BitArray getBitArray(long index){ + int bin=getBin(index); + BitArray answer=null; + if(bin>=0){ + if(firstIndex<0){ + firstIndex=0; + } + if(bin>=list.size()){ + list.removeFirst(); + firstIndex+=BitArray.LONG_SIZE; + list.add(new BitArray()); + bin=list.size()-1; + } + answer=list.get(bin); + } + return answer; + } + + /** + * Get the index of the bin from the total index + * + * @param index + * @return the index of the bin + */ + private int getBin(long index){ + int answer=0; + if(firstBin<0){ + firstBin=0; + }else if(firstIndex>=0){ + answer=(int)((index-firstIndex)/BitArray.LONG_SIZE); + } + return answer; + } + + /** + * Get the offset into a bin from the total index + * + * @param index + * @return the relative offset into a bin + */ + private int getOffset(long index){ + int answer=0; + if(firstIndex>=0){ + answer=(int)((index-firstIndex)-(BitArray.LONG_SIZE*getBin(index))); + } + return answer; + } +} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java b/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java index 03d8c31df2..9c6147f7df 100755 --- a/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java @@ -108,5 +108,65 @@ public class IdGenerator{ result = result.replace('.', '-'); return result; } + + /** + * From a generated id - return the seed (i.e. minus the count) + * + * @param id the generated identifer + * @return the seed + */ + public static String getSeedFromId(String id) { + String result = id; + if (id != null) { + int index = id.lastIndexOf(':'); + if (index > 0 && (index + 1) < id.length()) { + result = id.substring(0, index + 1); + } + } + return result; + } + + /** + * From a generated id - return the generator count + * + * @param id + * @return the count + */ + public static long getSequenceFromId(String id) { + long result = -1; + if (id != null) { + int index = id.lastIndexOf(':'); + + if (index > 0 && (index + 1) < id.length()) { + String numStr = id.substring(index + 1, id.length()); + result = Long.parseLong(numStr); + } + } + return result; + } + + /** + * Does a proper compare on the ids + * + * @param id1 + * @param id2 + * @return 0 if equal else a positive if id1 is > id2 ... + */ + + public static int compare(String id1, String id2) { + int result = -1; + String seed1 = IdGenerator.getSeedFromId(id1); + String seed2 = IdGenerator.getSeedFromId(id2); + if (seed1 != null && seed2 != null) { + result = seed1.compareTo(seed2); + if (result == 0) { + long count1 = IdGenerator.getSequenceFromId(id1); + long count2 = IdGenerator.getSequenceFromId(id2); + result = (int) (count1 - count2); + } + } + return result; + + } }