mirror of https://github.com/apache/activemq.git
polish: some typos
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1394732 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
536e78208e
commit
a5eb7c6fa2
|
@ -67,7 +67,7 @@ public class Topic extends BaseDestination implements Task {
|
||||||
private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock();
|
private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock();
|
||||||
private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
|
private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
|
||||||
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
|
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
|
||||||
private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
|
private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
|
||||||
private final TaskRunner taskRunner;
|
private final TaskRunner taskRunner;
|
||||||
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
|
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
|
||||||
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
|
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
|
||||||
|
@ -171,7 +171,7 @@ public class Topic extends BaseDestination implements Task {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
|
durableSubscribers.put(dsub.getSubscriptionKey(), dsub);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,7 +188,7 @@ public class Topic extends BaseDestination implements Task {
|
||||||
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
|
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
|
||||||
if (topicStore != null) {
|
if (topicStore != null) {
|
||||||
topicStore.deleteSubscription(key.clientId, key.subscriptionName);
|
topicStore.deleteSubscription(key.clientId, key.subscriptionName);
|
||||||
DurableTopicSubscription removed = durableSubcribers.remove(key);
|
DurableTopicSubscription removed = durableSubscribers.remove(key);
|
||||||
if (removed != null) {
|
if (removed != null) {
|
||||||
destinationStatistics.getConsumers().decrement();
|
destinationStatistics.getConsumers().decrement();
|
||||||
// deactivate and remove
|
// deactivate and remove
|
||||||
|
@ -505,7 +505,7 @@ public class Topic extends BaseDestination implements Task {
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean canOptimizeOutPersistence() {
|
private boolean canOptimizeOutPersistence() {
|
||||||
return durableSubcribers.size() == 0;
|
return durableSubscribers.size() == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -590,7 +590,7 @@ public class Topic extends BaseDestination implements Task {
|
||||||
});
|
});
|
||||||
final ConnectionContext connectionContext = createConnectionContext();
|
final ConnectionContext connectionContext = createConnectionContext();
|
||||||
for (Message message : toExpire) {
|
for (Message message : toExpire) {
|
||||||
for (DurableTopicSubscription sub : durableSubcribers.values()) {
|
for (DurableTopicSubscription sub : durableSubscribers.values()) {
|
||||||
if (!sub.isActive()) {
|
if (!sub.isActive()) {
|
||||||
messageExpired(connectionContext, sub, message);
|
messageExpired(connectionContext, sub, message);
|
||||||
}
|
}
|
||||||
|
@ -725,9 +725,9 @@ public class Topic extends BaseDestination implements Task {
|
||||||
protected boolean isOptimizeStorage(){
|
protected boolean isOptimizeStorage(){
|
||||||
boolean result = false;
|
boolean result = false;
|
||||||
|
|
||||||
if (isDoOptimzeMessageStorage() && durableSubcribers.isEmpty()==false){
|
if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){
|
||||||
result = true;
|
result = true;
|
||||||
for (DurableTopicSubscription s : durableSubcribers.values()) {
|
for (DurableTopicSubscription s : durableSubscribers.values()) {
|
||||||
if (s.isActive()== false){
|
if (s.isActive()== false){
|
||||||
result = false;
|
result = false;
|
||||||
break;
|
break;
|
||||||
|
@ -755,7 +755,7 @@ public class Topic extends BaseDestination implements Task {
|
||||||
public void clearPendingMessages() {
|
public void clearPendingMessages() {
|
||||||
dispatchLock.readLock().lock();
|
dispatchLock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
for (DurableTopicSubscription durableTopicSubscription : durableSubcribers.values()) {
|
for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) {
|
||||||
clearPendingAndDispatch(durableTopicSubscription);
|
clearPendingAndDispatch(durableTopicSubscription);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -777,6 +777,6 @@ public class Topic extends BaseDestination implements Task {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
|
public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
|
||||||
return durableSubcribers;
|
return durableSubscribers;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,12 +78,12 @@ public class DurableSubscriptionSelectorTest extends org.apache.activemq.TestSup
|
||||||
|
|
||||||
Wait.waitFor(new Wait.Condition() { public boolean isSatisified() { return received >= 1;} }, 10000);
|
Wait.waitFor(new Wait.Condition() { public boolean isSatisified() { return received >= 1;} }, 10000);
|
||||||
|
|
||||||
assertEquals("Message is not recieved.", 1, received);
|
assertEquals("Message is not received.", 1, received);
|
||||||
|
|
||||||
sendMessage(true);
|
sendMessage(true);
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
||||||
assertEquals("Message is not recieved.", 2, received);
|
assertEquals("Message is not received.", 2, received);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void openConsumer() throws Exception {
|
private void openConsumer() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue