Improve concurrency by actively managing AbstractBrokerFactory's broker set when using non-finalizing brokers. Credit goes to Arunabh Hazarika for identifying the bottleneck and prototyping this solution.

git-svn-id: https://svn.apache.org/repos/asf/openjpa/branches/1.1.x@653000 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Patrick Linskey 2008-05-03 04:41:48 +00:00
parent 148bb16613
commit 53da9d32ab
4 changed files with 60 additions and 15 deletions

View File

@ -52,16 +52,32 @@ public class BrokerValue
public Object newInstance(String clsName, Class type, Configuration conf,
boolean fatal) {
// This is not synchronized. If there are concurrent invocations
// while _templateBroker is null, we'll just end up with extra
// template brokers, which will get safely garbage collected.
if (_templateBroker == null)
_templateBroker = (BrokerImpl) super.newInstance(clsName, type,
conf, fatal);
getTemplateBroker(clsName, type, conf, fatal);
try {
return _templateBroker.clone();
} catch (CloneNotSupportedException e) {
throw new InternalException(e);
}
}
public Class<? extends BrokerImpl> getTemplateBrokerType(Configuration c) {
return getTemplateBroker(getClassName(), BrokerImpl.class, c, true)
.getClass();
}
private BrokerImpl getTemplateBroker(String clsName, Class type,
Configuration conf, boolean fatal) {
if (clsName == null || !clsName.equals(getClassName()))
throw new IllegalArgumentException("clsName != configured value '"
+ getClassName() + "'");
// This is not synchronized. If there are concurrent invocations
// while _templateBroker is null, we'll just end up with extra
// template brokers, which will get safely garbage collected.
if (_templateBroker == null)
_templateBroker = (BrokerImpl) super.newInstance(clsName, type,
conf, fatal);
return _templateBroker;
}
}

View File

@ -76,7 +76,7 @@ public class OpenJPAConfigurationImpl
// openjpa properties
public ObjectValue classResolverPlugin;
public ObjectValue brokerPlugin;
public BrokerValue brokerPlugin;
public ObjectValue dataCachePlugin;
public ObjectValue dataCacheManagerPlugin;
public IntValue dataCacheTimeout;

View File

@ -19,7 +19,6 @@
package org.apache.openjpa.kernel;
import java.io.ObjectStreamException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -29,14 +28,18 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.collections.set.MapBackedSet;
import org.apache.openjpa.conf.OpenJPAConfiguration;
import org.apache.openjpa.conf.OpenJPAVersion;
import org.apache.openjpa.conf.BrokerValue;
import org.apache.openjpa.conf.OpenJPAConfigurationImpl;
import org.apache.openjpa.datacache.DataCacheStoreManager;
import org.apache.openjpa.ee.ManagedRuntime;
import org.apache.openjpa.enhance.PCRegistry;
@ -48,14 +51,12 @@ import org.apache.openjpa.lib.conf.Configuration;
import org.apache.openjpa.lib.conf.Configurations;
import org.apache.openjpa.lib.log.Log;
import org.apache.openjpa.lib.util.J2DoPrivHelper;
import org.apache.openjpa.lib.util.JavaVersions;
import org.apache.openjpa.lib.util.Localizer;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.openjpa.lib.util.concurrent.ConcurrentReferenceHashSet;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.openjpa.meta.MetaDataRepository;
import org.apache.openjpa.util.GeneralException;
import org.apache.openjpa.util.InternalException;
import org.apache.openjpa.util.InvalidStateException;
import org.apache.openjpa.util.OpenJPAException;
import org.apache.openjpa.util.UserException;
@ -90,8 +91,7 @@ public abstract class AbstractBrokerFactory
= new ConcurrentHashMap();
// weak-ref tracking of open brokers
private transient Collection _brokers = new ConcurrentReferenceHashSet
(ConcurrentReferenceHashSet.WEAK);
private transient Set _brokers;
// cache the class names loaded from the persistent classes property so
// that we can re-load them for each new broker
@ -145,6 +145,7 @@ public abstract class AbstractBrokerFactory
*/
protected AbstractBrokerFactory(OpenJPAConfiguration config) {
_conf = config;
_brokers = newBrokerSet();
getPcClassLoaders();
}
@ -384,7 +385,7 @@ public abstract class AbstractBrokerFactory
Broker broker;
for (Iterator itr = _brokers.iterator(); itr.hasNext();) {
broker = (Broker) itr.next();
// Check for null because _brokers contains weak references
// Check for null because _brokers may contain weak references
if ((broker != null) && (!broker.isClosed()))
broker.close();
}
@ -464,13 +465,29 @@ public abstract class AbstractBrokerFactory
// reset these transient fields to empty values
_transactional = new ConcurrentHashMap();
_brokers = new ConcurrentReferenceHashSet(
ConcurrentReferenceHashSet.WEAK);
_brokers = newBrokerSet();
makeReadOnly();
return this;
}
private Set newBrokerSet() {
BrokerValue bv;
if (_conf instanceof OpenJPAConfigurationImpl)
bv = ((OpenJPAConfigurationImpl) _conf).brokerPlugin;
else
bv = (BrokerValue) _conf.getValue(BrokerValue.KEY);
if (FinalizingBrokerImpl.class.isAssignableFrom(
bv.getTemplateBrokerType(_conf))) {
return MapBackedSet.decorate(new ConcurrentHashMap(),
new Object() { });
} else {
return new ConcurrentReferenceHashSet(
ConcurrentReferenceHashSet.WEAK);
}
}
////////////////////////
// Methods for Override
////////////////////////
@ -746,6 +763,16 @@ public abstract class AbstractBrokerFactory
return Collections.unmodifiableCollection(_brokers);
}
/**
* Release <code>broker</code> from any internal data structures. This
* is invoked by <code>broker</code> after the broker is fully closed.
*
* @since 1.1.0
*/
protected void releaseBroker(BrokerImpl broker) {
_brokers.remove(broker);
}
/**
* @return a key that can be used to obtain this broker factory from the
* pool at a later time.

View File

@ -4137,6 +4137,8 @@ public class BrokerImpl
if (_log.isTraceEnabled())
_closedException = new IllegalStateException();
_factory.releaseBroker(this);
if (err != null)
throw err;
}