https://issues.apache.org/jira/browse/AMQ-4485 - reenable test. concurrentStoreandDispatch case cannot be reconciled via setBatch, best we can do is trap duplicates from inflight messages as they occur. tagged async stores to support this. the revert of serialization with tx and non aysnc send brings perf back to where it was. https://issues.apache.org/jira/browse/AMQ-5266

This commit is contained in:
gtully 2014-10-07 14:49:58 +01:00
parent c1c82beb2d
commit 140ce1bc8f
6 changed files with 85 additions and 141 deletions

View File

@ -18,7 +18,6 @@ package org.apache.activemq.broker.region.cursors;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
@ -100,13 +99,19 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
clearIterator(true); clearIterator(true);
recovered = true; recovered = true;
storeHasMessages = true; storeHasMessages = true;
} else if (!cached) {
// a duplicate from the store (!cached) - needs to be removed/acked - otherwise it will get re dispatched on restart
if (message.isRecievedByDFBridge()) {
// expected for messages pending acks with kahadb.concurrentStoreAndDispatchQueues=true
LOG.trace("{} store replayed pending message due to concurrentStoreAndDispatchQueues {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
} else {
LOG.warn("{} - cursor got duplicate from store {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
duplicate(message);
}
} else { } else {
LOG.warn("{} - cursor got duplicate {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong()); LOG.warn("{} - cursor got duplicate send {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
if (message.getMessageId().getEntryLocator() instanceof Long) {
// a duplicate from the store - needs to be removed/acked - otherwise it will get redispatched on restart // JDBC will store a duplicate (with new sequence id) - it needs an ack (AMQ4952Test)
// jdbc store will store duplicates and will set entry locator to sequence long.
// REVISIT - this seems too hacky - see use case AMQ4952Test
if (!cached || message.getMessageId().getEntryLocator() instanceof Long) {
duplicate(message); duplicate(message);
} }
} }
@ -219,10 +224,11 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
if (!pendingCachedIds.isEmpty() || lastCachedId != null) { if (!pendingCachedIds.isEmpty() || lastCachedId != null) {
LOG.trace("{} - disabling cache. current Id: {} seq: {}, batchList size: {}", LOG.trace("{} - disabling cache. current Id: {} seq: {}, batchList size: {}",
new Object[]{this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong(), batchList.size()}); new Object[]{this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong(), batchList.size()});
collapseLastCachedIds(); pruneLastCached();
if (lastCachedId != null) { if (lastCachedId != null) {
setBatch(lastCachedId); setBatch(lastCachedId);
lastCachedId = null; lastCachedId = null;
pendingCachedIds.clear();
} }
} }
} }
@ -240,6 +246,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
Future future = (Future) futureOrLong; Future future = (Future) futureOrLong;
if (future.isCancelled()) { if (future.isCancelled()) {
it.remove(); it.remove();
} else {
break;
} }
} else { } else {
// store complete - track via lastCachedId // store complete - track via lastCachedId
@ -249,25 +257,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
} }
} }
private void collapseLastCachedIds() throws Exception {
for (MessageId candidate : pendingCachedIds) {
final Object futureOrLong = candidate.getFutureOrSequenceLong();
if (futureOrLong instanceof Future) {
Future future = (Future) futureOrLong;
try {
future.get();
// future should be replaced with sequence by this time
} catch (CancellationException ignored) {
continue;
}
}
setLastCachedId(candidate);
}
pendingCachedIds.clear();
}
private void setLastCachedId(MessageId candidate) { private void setLastCachedId(MessageId candidate) {
if (lastCachedId == null) { if (lastCachedId == null || lastCachedId.getFutureOrSequenceLong() == null) { // possibly null for topics
lastCachedId = candidate; lastCachedId = candidate;
} else if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), ((Long) lastCachedId.getFutureOrSequenceLong())) > 0) { } else if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), ((Long) lastCachedId.getFutureOrSequenceLong())) > 0) {
lastCachedId = candidate; lastCachedId = candidate;
@ -360,7 +351,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
public String toString() { public String toString() {
return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
+ ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled() + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled()
+ ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size() + ",lastCachedId:" + lastCachedId; + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size() + ",lastCachedId:" + lastCachedId + ",lastCachedId-seq:" + (lastCachedId != null ? lastCachedId.getFutureOrSequenceLong() : "null");
} }
protected abstract void doFillBatch() throws Exception; protected abstract void doFillBatch() throws Exception;

View File

@ -30,7 +30,6 @@ import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask; import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -38,12 +37,11 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
@ -86,7 +84,7 @@ import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, MessageDatabase.SerialExecution<Location> { public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class); static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
private static final int MAX_ASYNC_JOBS = 10000; private static final int MAX_ASYNC_JOBS = 10000;
@ -124,7 +122,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
return txid; return txid;
} }
}; };
serialExecutor = this;
} }
@Override @Override
@ -347,17 +344,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
this.forceRecoverIndex = forceRecoverIndex; this.forceRecoverIndex = forceRecoverIndex;
} }
@Override
public Location execute(Callable<Location> c) throws Exception {
if (isConcurrentStoreAndDispatchQueues()) {
FutureTask<Location> future = new FutureTask<>(c);
this.queueExecutor.execute(future);
return future.get();
} else {
return c.call();
}
}
public class KahaDBMessageStore extends AbstractMessageStore { public class KahaDBMessageStore extends AbstractMessageStore {
protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>(); protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
protected KahaDestination dest; protected KahaDestination dest;
@ -383,25 +369,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
throws IOException { throws IOException {
if (isConcurrentStoreAndDispatchQueues()) { if (isConcurrentStoreAndDispatchQueues()) {
StoreQueueTask result = new StoreQueueTask(this, context, message); StoreQueueTask result = new StoreQueueTask(this, context, message);
ListenableFuture<Object> future = result.getFuture();
message.getMessageId().setFutureOrSequenceLong(future);
message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch
result.aquireLocks(); result.aquireLocks();
addQueueTask(this, result); addQueueTask(this, result);
final ListenableFuture<Object> future = result.getFuture();
if (indexListener != null) { if (indexListener != null) {
// allow concurrent dispatch by setting entry locator, // allow concurrent dispatch by setting entry locator,
// wait for add completion to remove potential pending addition indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
message.getMessageId().setFutureOrSequenceLong(future);
indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
@Override
public void run() {
try {
future.get();
trackPendingAddComplete(dest, (Long) message.getMessageId().getFutureOrSequenceLong());
} catch (CancellationException okNothingToTrack) {
} catch (Exception e) {
LOG.warn("{} unexpected exception tracking completion of async add of {}", this, message.getMessageId(), e);
}
}
}));
} }
return future; return future;
} else { } else {
@ -442,7 +417,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
@Override @Override
public void addMessage(final ConnectionContext context, final Message message) throws IOException { public void addMessage(final ConnectionContext context, final Message message) throws IOException {
KahaAddMessageCommand command = new KahaAddMessageCommand(); final KahaAddMessageCommand command = new KahaAddMessageCommand();
command.setDestination(dest); command.setDestination(dest);
command.setMessageId(message.getMessageId().toProducerKey()); command.setMessageId(message.getMessageId().toProducerKey());
command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId()))); command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId())));
@ -450,25 +425,25 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
command.setPrioritySupported(isPrioritizedMessages()); command.setPrioritySupported(isPrioritizedMessages());
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() { store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() {
@Override // sync add? (for async, future present from getFutureOrSequenceLong)
public void sequenceAssignedWithIndexLocked(final long sequence) { Object possibleFuture = message.getMessageId().getFutureOrSequenceLong();
final Object possibleFuture = message.getMessageId().getFutureOrSequenceLong();
message.getMessageId().setFutureOrSequenceLong(sequence); public void sequenceAssignedWithIndexLocked(final long sequence) {
if (indexListener != null) { message.getMessageId().setFutureOrSequenceLong(sequence);
trackPendingAdd(dest, sequence); if (indexListener != null) {
if (possibleFuture == null) { if (possibleFuture == null) {
// sync add (for async future present from getFutureOrSequenceLong) trackPendingAdd(dest, sequence);
indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
@Override @Override
public void run() { public void run() {
trackPendingAddComplete(dest, sequence); trackPendingAddComplete(dest, sequence);
} }
})); }));
}
} }
} }
} }, null);
}, null);
} }
@Override @Override
@ -680,32 +655,23 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
@Override @Override
public void setBatch(final MessageId identity) throws IOException { public void setBatch(final MessageId identity) throws IOException {
final String key = identity.toProducerKey();
indexLock.writeLock().lock();
try { try {
final String key = identity.toProducerKey(); pageFile.tx().execute(new Transaction.Closure<IOException>() {
lockAsyncJobQueue(); @Override
public void execute(Transaction tx) throws IOException {
// Hopefully one day the page file supports concurrent read StoredDestination sd = getStoredDestination(dest, tx);
// operations... but for now we must externally synchronize... Long location = sd.messageIdIndex.get(tx, key);
if (location != null) {
indexLock.writeLock().lock(); sd.orderIndex.setBatch(tx, location);
try { } else {
pageFile.tx().execute(new Transaction.Closure<IOException>() { LOG.warn("{} {} setBatch failed, location for {} not found in messageId index for {}", this, dest.getName(), identity.getFutureOrSequenceLong(), identity);
@Override
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
Long location = sd.messageIdIndex.get(tx, key);
if (location != null) {
sd.orderIndex.setBatch(tx, location);
} else {
LOG.warn("{} Location {} not found in order index for {}", this, identity.getFutureOrSequenceLong(), identity);
}
} }
}); }
} finally { });
indexLock.writeLock().unlock();
}
} finally { } finally {
unlockAsyncJobQueue(); indexLock.writeLock().unlock();
} }
} }
@ -723,7 +689,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
protected void lockAsyncJobQueue() { protected void lockAsyncJobQueue() {
try { try {
this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) {
throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore);
}
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed to lock async jobs for " + this.destination, e); LOG.error("Failed to lock async jobs for " + this.destination, e);
} }

View File

@ -256,11 +256,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private boolean enableIndexPageCaching = true; private boolean enableIndexPageCaching = true;
ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
interface SerialExecution<V> {
public V execute(Callable<V> c) throws Exception;
}
SerialExecution<Location> serialExecutor;
@Override @Override
public void doStart() throws Exception { public void doStart() throws Exception {
load(); load();
@ -962,20 +957,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return store(data, sync, before, after, null); return store(data, sync, before, after, null);
} }
public Location store(final KahaCommitCommand data, final boolean sync, final IndexAware before, final Runnable after) throws IOException {
try {
return serialExecutor.execute(new Callable<Location>() {
@Override
public Location call() throws Exception {
return store(data, sync, before, after, null);
}
});
} catch (Exception e) {
LOG.error("Failed to execute commit", e);
throw new IOException(e);
}
}
/** /**
* All updated are are funneled through this method. The updates are converted * All updated are are funneled through this method. The updates are converted
* to a JournalMessage which is logged to the journal and then the data from * to a JournalMessage which is logged to the journal and then the data from
@ -1259,24 +1240,19 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} }
final List<Operation> messagingTx = inflightTx; final List<Operation> messagingTx = inflightTx;
indexLock.writeLock().lock();
try { try {
indexLock.writeLock().lock(); pageFile.tx().execute(new Transaction.Closure<IOException>() {
try { @Override
pageFile.tx().execute(new Transaction.Closure<IOException>() { public void execute(Transaction tx) throws IOException {
@Override for (Operation op : messagingTx) {
public void execute(Transaction tx) throws IOException { op.execute(tx);
for (Operation op : messagingTx) {
op.execute(tx);
}
} }
}); }
metadata.lastUpdate = location; });
} finally { metadata.lastUpdate = location;
indexLock.writeLock().unlock(); } finally {
} indexLock.writeLock().unlock();
} catch (Exception e) {
LOG.error("serial execution of commit failed", e);
throw new IOException(e);
} }
} }

View File

@ -557,7 +557,6 @@
<artifactId>maven-surefire-plugin</artifactId> <artifactId>maven-surefire-plugin</artifactId>
<configuration> <configuration>
<excludes combine.children="append"> <excludes combine.children="append">
<exclude>**/AMQ4485LowLimitTest.*</exclude>
<!-- http://jira.activemq.org/jira/browse/AMQ-626 --> <!-- http://jira.activemq.org/jira/browse/AMQ-626 -->
<exclude>**/MultipleTestsWithSpringFactoryBeanTest.*</exclude> <exclude>**/MultipleTestsWithSpringFactoryBeanTest.*</exclude>
<exclude>**/MultipleTestsWithXBeanFactoryBeanTest.*</exclude> <exclude>**/MultipleTestsWithXBeanFactoryBeanTest.*</exclude>

View File

@ -242,7 +242,7 @@ public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
if (tally.accumulator.get() != expected) { if (tally.accumulator.get() != expected) {
LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get() + " != " + expected + ", " + tally.expected); LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get() + " != " + expected + ", " + tally.expected);
if (tally.accumulator.get() > expected - 50) { if (tally.accumulator.get() > expected - 50) {
dumpQueueStat(tally.destination); dumpQueueStat(null);
} }
if (tally.expected.size() == 1) { if (tally.expected.size() == 1) {
startConsumer(tally.brokerName, tally.destination); startConsumer(tally.brokerName, tally.destination);
@ -260,6 +260,9 @@ public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
LOG.info("done"); LOG.info("done");
long duration = System.currentTimeMillis() - startTime; long duration = System.currentTimeMillis() - startTime;
LOG.info("Duration:" + TimeUtils.printDuration(duration)); LOG.info("Duration:" + TimeUtils.printDuration(duration));
assertEquals("nothing in the dlq's", 0, dumpQueueStat(new ActiveMQQueue("ActiveMQ.DLQ")));
} }
private void startConsumer(String brokerName, ActiveMQDestination destination) throws Exception { private void startConsumer(String brokerName, ActiveMQDestination destination) throws Exception {
@ -273,17 +276,20 @@ public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
queueConnection.close(); queueConnection.close();
} }
private void dumpQueueStat(ActiveMQDestination destination) throws Exception { private long dumpQueueStat(ActiveMQDestination destination) throws Exception {
long sumTotal = 0;
Collection<BrokerItem> brokerList = brokers.values(); Collection<BrokerItem> brokerList = brokers.values();
for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext(); ) { for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext(); ) {
BrokerService brokerService = i.next().broker; BrokerService brokerService = i.next().broker;
for (ObjectName objectName : brokerService.getAdminView().getQueues()) { for (ObjectName objectName : brokerService.getAdminView().getQueues()) {
//if (objectName.toString().contains(destination.getQualifiedName())) { if (destination != null && objectName.toString().contains(destination.getPhysicalName())) {
QueueViewMBean qViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(objectName, QueueViewMBean.class, false); QueueViewMBean qViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(objectName, QueueViewMBean.class, false);
LOG.info(brokerService.getBrokerName() + ", " + qViewMBean.getName() + " Size: " + qViewMBean.getEnqueueCount()); LOG.info(brokerService.getBrokerName() + ", " + qViewMBean.getName() + ", Enqueue:" + qViewMBean.getEnqueueCount() + ", Size: " + qViewMBean.getQueueSize());
//} sumTotal += qViewMBean.getQueueSize();
}
} }
} }
return sumTotal;
} }
private void startAllGWFanoutConsumers(int nBrokers) throws Exception { private void startAllGWFanoutConsumers(int nBrokers) throws Exception {

View File

@ -42,6 +42,7 @@ import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory; import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -456,6 +457,9 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
BrokerService brokerService = brokers.get(broker2).broker; BrokerService brokerService = brokers.get(broker2).broker;
brokerService.setPersistent(true); brokerService.setPersistent(true);
brokerService.setDeleteAllMessagesOnStartup(true); brokerService.setDeleteAllMessagesOnStartup(true);
// disable concurrent dispatch otherwise store duplicate suppression will be skipped b/c cursor audit is already
// disabled so verification of stats will fail - ie: duplicate will be dispatched
((KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false);
brokerService.setPlugins(new BrokerPlugin[]{ brokerService.setPlugins(new BrokerPlugin[]{
new BrokerPluginSupport() { new BrokerPluginSupport() {
@Override @Override