Added dup detection capability

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@551902 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-06-29 14:07:46 +00:00
parent 56be6b93d3
commit dd1d660149
4 changed files with 481 additions and 0 deletions

View File

@ -0,0 +1,130 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq;
import java.util.LinkedHashMap;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.util.BitArrayBin;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LRUCache;
/**
* Provides basic audit functions for Messages
*
* @version $Revision: 1.1.1.1 $
*/
public class ActiveMQMessageAudit{
private static final int DEFAULT_WINDOW_SIZE=1024;
private static final int MAXIMUM_PRODUCER_COUNT=128;
private int windowSize;
private LinkedHashMap<Object,BitArrayBin> map;
/**
* Default Constructor windowSize = 1024, maximumNumberOfProducersToTrack = 128
*/
public ActiveMQMessageAudit(){
this(DEFAULT_WINDOW_SIZE,MAXIMUM_PRODUCER_COUNT);
}
/**
* Construct a MessageAudit
*
* @param windowSize range of ids to track
* @param maximumNumberOfProducersToTrack number of producers expected in the system
*/
public ActiveMQMessageAudit(int windowSize,final int maximumNumberOfProducersToTrack){
this.windowSize=windowSize;
map=new LRUCache<Object,BitArrayBin>(maximumNumberOfProducersToTrack,maximumNumberOfProducersToTrack,0.75f,true);
}
/**
* Checks if this message has beeb seen before
*
* @param message
* @return true if the message is a duplicate
* @throws JMSException
*/
public boolean isDuplicateMessage(Message message) throws JMSException{
return isDuplicate(message.getJMSMessageID());
}
/**
* checks whether this messageId has been seen before and adds this messageId to the list
*
* @param id
* @return true if the message is a duplicate
*/
public synchronized boolean isDuplicate(String id){
boolean answer=false;
String seed=IdGenerator.getSeedFromId(id);
if(seed!=null){
BitArrayBin bab=map.get(seed);
if(bab==null){
bab=new BitArrayBin(windowSize);
map.put(seed,bab);
}
long index=IdGenerator.getSequenceFromId(id);
if(index>=0){
answer=bab.setBit(index,true);
}
}
return answer;
}
/**
* Checks if this message has beeb seen before
*
* @param message
* @return true if the message is a duplicate
*/
public synchronized boolean isDuplicateMessageReference(final MessageReference message){
boolean answer=false;
MessageId id=message.getMessageId();
if(id!=null){
ProducerId pid=id.getProducerId();
if(pid!=null){
BitArrayBin bab=map.get(pid);
if(bab==null){
bab=new BitArrayBin(windowSize);
map.put(pid,bab);
}
answer=bab.setBit(id.getProducerSequenceId(),true);
}
}
return answer;
}
/**
* uun mark this messager as being received
* @param message
*/
public synchronized void rollbackMessageReference(final MessageReference message){
MessageId id=message.getMessageId();
if(id!=null){
ProducerId pid=id.getProducerId();
if(pid!=null){
BitArrayBin bab=map.get(pid);
if(bab!=null){
bab.setBit(id.getProducerSequenceId(),false);
}
}
}
}
}

View File

@ -0,0 +1,152 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* Simple BitArray to enable setting multiple boolean values efficently Used instead of BitSet because BitSet does not
* allow for efficent serialization.
* Will store up to 64 boolean values
*
* @version $Revision: 1.1.1.1 $
*/
public class BitArray {
static final int LONG_SIZE = 64;
static final int INT_SIZE = 32;
static final int SHORT_SIZE = 16;
static final int BYTE_SIZE = 8;
private static final long[] BIT_VALUES = {0x0000000000000001L, 0x0000000000000002L, 0x0000000000000004L,
0x0000000000000008L, 0x0000000000000010L, 0x0000000000000020L, 0x0000000000000040L, 0x0000000000000080L,
0x0000000000000100L, 0x0000000000000200L, 0x0000000000000400L, 0x0000000000000800L, 0x0000000000001000L,
0x0000000000002000L, 0x0000000000004000L, 0x0000000000008000L, 0x0000000000010000L, 0x0000000000020000L,
0x0000000000040000L, 0x0000000000080000L, 0x0000000000100000L, 0x0000000000200000L, 0x0000000000400000L,
0x0000000000800000L, 0x0000000001000000L, 0x0000000002000000L, 0x0000000004000000L, 0x0000000008000000L,
0x0000000010000000L, 0x0000000020000000L, 0x0000000040000000L, 0x0000000080000000L, 0x0000000100000000L,
0x0000000200000000L, 0x0000000400000000L, 0x0000000800000000L, 0x0000001000000000L, 0x0000002000000000L,
0x0000004000000000L, 0x0000008000000000L, 0x0000010000000000L, 0x0000020000000000L, 0x0000040000000000L,
0x0000080000000000L, 0x0000100000000000L, 0x0000200000000000L, 0x0000400000000000L, 0x0000800000000000L,
0x0001000000000000L, 0x0002000000000000L, 0x0004000000000000L, 0x0008000000000000L, 0x0010000000000000L,
0x0020000000000000L, 0x0040000000000000L, 0x0080000000000000L, 0x0100000000000000L, 0x0200000000000000L,
0x0400000000000000L, 0x0800000000000000L, 0x1000000000000000L, 0x2000000000000000L, 0x4000000000000000L,
0x8000000000000000L};
private long bits;
private int length;
/**
* @return the length of bits set
*/
public int length() {
return length;
}
/**
* @return the long containing the bits
*/
public long getBits() {
return bits;
}
/**
* set the boolean value at the index
*
* @param index
* @param flag
* @return the old value held at this index
*/
public boolean set(int index, boolean flag) {
length = Math.max(length, index + 1);
boolean oldValue = (bits & BIT_VALUES[index]) != 0;
if (flag) {
bits |= BIT_VALUES[index];
}
else if (oldValue) {
bits &= ~(BIT_VALUES[index]);
}
return oldValue;
}
/**
* @param index
* @return the boolean value at this index
*/
public boolean get(int index) {
return (bits & BIT_VALUES[index]) != 0;
}
/**
* reset all the bit values to false
*/
public void reset(){
bits = 0;
}
/**
* reset all the bits to the value supplied
* @param bits
*/
public void reset(long bits){
this.bits = bits;
}
/**
* write the bits to an output stream
*
* @param dataOut
* @throws IOException
*/
public void writeToStream(DataOutput dataOut) throws IOException {
dataOut.writeByte(length);
if (length <= BYTE_SIZE) {
dataOut.writeByte((int) bits);
}
else if (length <= SHORT_SIZE) {
dataOut.writeShort((short) bits);
}
else if (length <= INT_SIZE) {
dataOut.writeInt((int) bits);
}
else {
dataOut.writeLong(bits);
}
}
/**
* read the bits from an input stream
*
* @param dataIn
* @throws IOException
*/
public void readFromStream(DataInput dataIn) throws IOException {
length = dataIn.readByte();
if (length <= BYTE_SIZE) {
bits = dataIn.readByte();
}
else if (length <= SHORT_SIZE) {
bits = dataIn.readShort();
}
else if (length <= INT_SIZE) {
bits=dataIn.readInt();
}
else {
bits = dataIn.readLong();
}
}
}

View File

@ -0,0 +1,139 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.util;
import java.util.LinkedList;
/**
* Holder for many bitArrays - used for message audit
*
* @version $Revision: 1.1.1.1 $
*/
public class BitArrayBin{
private LinkedList<BitArray> list;
private int maxNumberOfArrays;
private int firstIndex=-1;
private int firstBin=-1;
/**
* Create a BitArrayBin to a certain window size (number of messages to keep)
*
* @param windowSize
*/
public BitArrayBin(int windowSize){
maxNumberOfArrays=((windowSize+1)/BitArray.LONG_SIZE)+1;
maxNumberOfArrays=Math.max(maxNumberOfArrays,1);
list=new LinkedList<BitArray>();
for(int i=0;i<maxNumberOfArrays;i++){
list.add(new BitArray());
}
}
/**
* Set a bit
*
* @param index
* @param value
* @return true if set
*/
public boolean setBit(long index,boolean value){
boolean answer=true;
BitArray ba=getBitArray(index);
if(ba!=null){
int offset=getOffset(index);
if(offset>=0){
answer=ba.set(offset,value);
}
}
return answer;
}
/**
* Get the boolean value at the index
*
* @param index
* @return true/false
*/
public boolean getBit(long index){
boolean answer=index>=firstIndex;
BitArray ba=getBitArray(index);
if(ba!=null){
int offset=getOffset(index);
if(offset>=0){
answer=ba.get(offset);
return answer;
}
}else{
// gone passed range for previous bins so assume set
answer=true;
}
return answer;
}
/**
* Get the BitArray for the index
*
* @param index
* @return BitArray
*/
private BitArray getBitArray(long index){
int bin=getBin(index);
BitArray answer=null;
if(bin>=0){
if(firstIndex<0){
firstIndex=0;
}
if(bin>=list.size()){
list.removeFirst();
firstIndex+=BitArray.LONG_SIZE;
list.add(new BitArray());
bin=list.size()-1;
}
answer=list.get(bin);
}
return answer;
}
/**
* Get the index of the bin from the total index
*
* @param index
* @return the index of the bin
*/
private int getBin(long index){
int answer=0;
if(firstBin<0){
firstBin=0;
}else if(firstIndex>=0){
answer=(int)((index-firstIndex)/BitArray.LONG_SIZE);
}
return answer;
}
/**
* Get the offset into a bin from the total index
*
* @param index
* @return the relative offset into a bin
*/
private int getOffset(long index){
int answer=0;
if(firstIndex>=0){
answer=(int)((index-firstIndex)-(BitArray.LONG_SIZE*getBin(index)));
}
return answer;
}
}

View File

@ -109,4 +109,64 @@ public class IdGenerator{
return result;
}
/**
* From a generated id - return the seed (i.e. minus the count)
*
* @param id the generated identifer
* @return the seed
*/
public static String getSeedFromId(String id) {
String result = id;
if (id != null) {
int index = id.lastIndexOf(':');
if (index > 0 && (index + 1) < id.length()) {
result = id.substring(0, index + 1);
}
}
return result;
}
/**
* From a generated id - return the generator count
*
* @param id
* @return the count
*/
public static long getSequenceFromId(String id) {
long result = -1;
if (id != null) {
int index = id.lastIndexOf(':');
if (index > 0 && (index + 1) < id.length()) {
String numStr = id.substring(index + 1, id.length());
result = Long.parseLong(numStr);
}
}
return result;
}
/**
* Does a proper compare on the ids
*
* @param id1
* @param id2
* @return 0 if equal else a positive if id1 is > id2 ...
*/
public static int compare(String id1, String id2) {
int result = -1;
String seed1 = IdGenerator.getSeedFromId(id1);
String seed2 = IdGenerator.getSeedFromId(id2);
if (seed1 != null && seed2 != null) {
result = seed1.compareTo(seed2);
if (result == 0) {
long count1 = IdGenerator.getSequenceFromId(id1);
long count2 = IdGenerator.getSequenceFromId(id2);
result = (int) (count1 - count2);
}
}
return result;
}
}