bad commit - removing files

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@387592 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-03-21 18:04:05 +00:00
parent d56eda03db
commit 739d12ac20
4 changed files with 0 additions and 637 deletions

View File

@ -1,107 +0,0 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.memory;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
/**
* An implementation of {@link org.apache.activemq.store.MessageStore} which uses a
*
* @version $Revision: 1.7 $
*/
public class MemoryMessageStore implements MessageStore {
protected final ActiveMQDestination destination;
protected final Map messageTable;
public MemoryMessageStore(ActiveMQDestination destination) {
this(destination, new LinkedHashMap());
}
public MemoryMessageStore(ActiveMQDestination destination, Map messageTable) {
this.destination = destination;
this.messageTable = Collections.synchronizedMap(messageTable);
}
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
messageTable.put(message.getMessageId(), message);
}
public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
messageTable.put(messageId, messageRef);
}
public Message getMessage(MessageId identity) throws IOException {
return (Message) messageTable.get(identity);
}
public String getMessageReference(MessageId identity) throws IOException {
return (String) messageTable.get(identity);
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
messageTable.remove(ack.getLastMessageId());
}
public void removeMessage(MessageId msgId) throws IOException {
messageTable.remove(msgId);
}
public void recover(MessageRecoveryListener listener) throws Exception {
// the message table is a synchronizedMap - so just have to synchronize here
synchronized(messageTable){
for(Iterator iter=messageTable.values().iterator();iter.hasNext();){
Object msg=(Object) iter.next();
if(msg.getClass()==String.class){
listener.recoverMessageReference((String) msg);
}else{
listener.recoverMessage((Message) msg);
}
}
listener.finished();
}
}
public void start() throws IOException {
}
public void stop(long timeout) throws IOException {
}
public void removeAllMessages(ConnectionContext context) throws IOException {
messageTable.clear();
}
public ActiveMQDestination getDestination() {
return destination;
}
public void delete() {
messageTable.clear();
}
}

View File

@ -1,150 +0,0 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.memory;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
/**
* @org.apache.xbean.XBean
*
* @version $Revision: 1.4 $
*/
public class MemoryPersistenceAdapter implements PersistenceAdapter {
private static final Log log = LogFactory.getLog(MemoryPersistenceAdapter.class);
MemoryTransactionStore transactionStore;
ConcurrentHashMap topics = new ConcurrentHashMap();
ConcurrentHashMap queues = new ConcurrentHashMap();
private boolean useExternalMessageReferences;
public Set getDestinations() {
Set rc = new HashSet(queues.size()+topics.size());
for (Iterator iter = queues.keySet().iterator(); iter.hasNext();) {
rc.add( iter.next() );
}
for (Iterator iter = topics.keySet().iterator(); iter.hasNext();) {
rc.add( iter.next() );
}
return rc;
}
public static MemoryPersistenceAdapter newInstance(File file) {
return new MemoryPersistenceAdapter();
}
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
MessageStore rc = (MessageStore)queues.get(destination);
if(rc==null) {
rc = new MemoryMessageStore(destination);
if( transactionStore !=null ) {
rc = transactionStore.proxy(rc);
}
queues.put(destination, rc);
}
return rc;
}
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
TopicMessageStore rc = (TopicMessageStore)topics.get(destination);
if(rc==null) {
rc = new MemoryTopicMessageStore(destination);
if( transactionStore !=null ) {
rc = transactionStore.proxy(rc);
}
topics.put(destination, rc);
}
return rc;
}
public TransactionStore createTransactionStore() throws IOException {
if( transactionStore==null ) {
transactionStore = new MemoryTransactionStore();
}
return transactionStore;
}
public void beginTransaction(ConnectionContext context) {
}
public void commitTransaction(ConnectionContext context) {
}
public void rollbackTransaction(ConnectionContext context) {
}
public void start() throws Exception {
}
public void stop() throws Exception {
}
public long getLastMessageBrokerSequenceId() throws IOException {
return 0;
}
public void deleteAllMessages() throws IOException {
for (Iterator iter = topics.values().iterator(); iter.hasNext();) {
MemoryMessageStore store = asMemoryMessageStore(iter.next());
if (store != null) {
store.delete();
}
}
for (Iterator iter = queues.values().iterator(); iter.hasNext();) {
MemoryMessageStore store = asMemoryMessageStore(iter.next());
if (store != null) {
store.delete();
}
}
if (transactionStore != null) {
transactionStore.delete();
}
}
public boolean isUseExternalMessageReferences() {
return useExternalMessageReferences;
}
public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
this.useExternalMessageReferences = useExternalMessageReferences;
}
protected MemoryMessageStore asMemoryMessageStore(Object value) {
if (value instanceof MemoryMessageStore) {
return (MemoryMessageStore) value;
}
log.warn("Expected an instance of MemoryMessageStore but was: " + value);
return null;
}
}

View File

@ -1,124 +0,0 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.memory;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.util.SubscriptionKey;
/**
* @version $Revision: 1.5 $
*/
public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore {
private Map ackDatabase;
private Map subscriberDatabase;
MessageId lastMessageId;
public MemoryTopicMessageStore(ActiveMQDestination destination) {
this(destination, new LinkedHashMap(), makeMap(), makeMap());
}
protected static Map makeMap() {
return Collections.synchronizedMap(new HashMap());
}
public MemoryTopicMessageStore(ActiveMQDestination destination, Map messageTable, Map subscriberDatabase, Map ackDatabase) {
super(destination, messageTable);
this.subscriberDatabase = subscriberDatabase;
this.ackDatabase = ackDatabase;
}
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
super.addMessage(context, message);
lastMessageId = message.getMessageId();
}
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
ackDatabase.put(new SubscriptionKey(clientId, subscriptionName), messageId);
}
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
return (SubscriptionInfo) subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName));
}
public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
SubscriptionInfo info = new SubscriptionInfo();
info.setDestination(destination);
info.setClientId(clientId);
info.setSelector(selector);
info.setSubcriptionName(subscriptionName);
SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
subscriberDatabase.put(key, info);
MessageId l=retroactive ? null : lastMessageId;
if( l!=null ) {
ackDatabase.put(key, l);
}
}
public void deleteSubscription(String clientId, String subscriptionName) {
org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
ackDatabase.remove(key);
subscriberDatabase.remove(key);
}
public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
throws Exception{
MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
boolean pastLastAck=lastAck==null;
// the message table is a synchronizedMap - so just have to synchronize here
synchronized(messageTable){
for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
Map.Entry entry=(Entry) iter.next();
if(pastLastAck){
Object msg=entry.getValue();
if(msg.getClass()==String.class){
listener.recoverMessageReference((String) msg);
}else{
listener.recoverMessage((Message) msg);
}
}else{
pastLastAck=entry.getKey().equals(lastAck);
}
}
listener.finished();
}
}
public void delete() {
super.delete();
ackDatabase.clear();
subscriberDatabase.clear();
lastMessageId=null;
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return (SubscriptionInfo[]) subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
}
}

View File

@ -1,256 +0,0 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.memory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import javax.transaction.xa.XAException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ProxyMessageStore;
import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
/**
* Provides a TransactionStore implementation that can create transaction aware
* MessageStore objects from non transaction aware MessageStore objects.
*
* @version $Revision: 1.4 $
*/
public class MemoryTransactionStore implements TransactionStore {
ConcurrentHashMap inflightTransactions = new ConcurrentHashMap();
ConcurrentHashMap preparedTransactions = new ConcurrentHashMap();
private boolean doingRecover;
public static class Tx {
private ArrayList messages = new ArrayList();
private ArrayList acks = new ArrayList();
public void add(AddMessageCommand msg) {
messages.add(msg);
}
public void add(RemoveMessageCommand ack) {
acks.add(ack);
}
public Message[] getMessages() {
Message rc[] = new Message[messages.size()];
int count=0;
for (Iterator iter = messages.iterator(); iter.hasNext();) {
AddMessageCommand cmd = (AddMessageCommand) iter.next();
rc[count++] = cmd.getMessage();
}
return rc;
}
public MessageAck[] getAcks() {
MessageAck rc[] = new MessageAck[acks.size()];
int count=0;
for (Iterator iter = acks.iterator(); iter.hasNext();) {
RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next();
rc[count++] = cmd.getMessageAck();
}
return rc;
}
/**
* @throws IOException
*/
public void commit() throws IOException {
// Do all the message adds.
for (Iterator iter = messages.iterator(); iter.hasNext();) {
AddMessageCommand cmd = (AddMessageCommand) iter.next();
cmd.run();
}
// And removes..
for (Iterator iter = acks.iterator(); iter.hasNext();) {
RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next();
cmd.run();
}
}
}
public interface AddMessageCommand {
Message getMessage();
void run() throws IOException;
}
public interface RemoveMessageCommand {
MessageAck getMessageAck();
void run() throws IOException;
}
public MessageStore proxy(MessageStore messageStore) {
return new ProxyMessageStore(messageStore) {
public void addMessage(ConnectionContext context, final Message send) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), send);
}
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
}
};
}
public TopicMessageStore proxy(TopicMessageStore messageStore) {
return new ProxyTopicMessageStore(messageStore) {
public void addMessage(ConnectionContext context, final Message send) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), send);
}
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
}
};
}
/**
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void prepare(TransactionId txid) {
Tx tx = (Tx) inflightTransactions.remove(txid);
if (tx == null)
return;
preparedTransactions.put(txid, tx);
}
public Tx getTx(Object txid) {
Tx tx = (Tx) inflightTransactions.get(txid);
if (tx == null) {
tx = new Tx();
inflightTransactions.put(txid, tx);
}
return tx;
}
/**
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
Tx tx;
if( wasPrepared ) {
tx = (Tx) preparedTransactions.remove(txid);
} else {
tx = (Tx) inflightTransactions.remove(txid);
}
if( tx == null )
return;
tx.commit();
}
/**
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
public void rollback(TransactionId txid) {
preparedTransactions.remove(txid);
inflightTransactions.remove(txid);
}
public void start() throws Exception {
}
public void stop() throws Exception {
}
synchronized public void recover(TransactionRecoveryListener listener) throws IOException {
// All the inflight transactions get rolled back..
inflightTransactions.clear();
this.doingRecover = true;
try {
for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
Object txid = (Object) iter.next();
Tx tx = (Tx) preparedTransactions.get(txid);
listener.recover((XATransactionId) txid, tx.getMessages(), tx.getAcks());
}
} finally {
this.doingRecover = false;
}
}
/**
* @param message
* @throws IOException
*/
void addMessage(final MessageStore destination, final Message message) throws IOException {
if( doingRecover )
return;
if (message.getTransactionId()!=null) {
Tx tx = getTx(message.getTransactionId());
tx.add(new AddMessageCommand() {
public Message getMessage() {
return message;
}
public void run() throws IOException {
destination.addMessage(null, message);
}
});
} else {
destination.addMessage(null, message);
}
}
/**
* @param ack
* @throws IOException
*/
private void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException {
if( doingRecover )
return;
if (ack.isInTransaction()) {
Tx tx = getTx(ack.getTransactionId());
tx.add(new RemoveMessageCommand() {
public MessageAck getMessageAck() {
return ack;
}
public void run() throws IOException {
destination.removeMessage(null, ack);
}
});
} else {
destination.removeMessage(null, ack);
}
}
public void delete() {
inflightTransactions.clear();
preparedTransactions.clear();
doingRecover=false;
}
}