diff --git a/openjpa-kernel/src/main/java/org/apache/openjpa/conf/BrokerValue.java b/openjpa-kernel/src/main/java/org/apache/openjpa/conf/BrokerValue.java index 248d0e2b2..ee0db6ca0 100644 --- a/openjpa-kernel/src/main/java/org/apache/openjpa/conf/BrokerValue.java +++ b/openjpa-kernel/src/main/java/org/apache/openjpa/conf/BrokerValue.java @@ -52,16 +52,32 @@ public class BrokerValue public Object newInstance(String clsName, Class type, Configuration conf, boolean fatal) { - // This is not synchronized. If there are concurrent invocations - // while _templateBroker is null, we'll just end up with extra - // template brokers, which will get safely garbage collected. - if (_templateBroker == null) - _templateBroker = (BrokerImpl) super.newInstance(clsName, type, - conf, fatal); + getTemplateBroker(clsName, type, conf, fatal); + try { return _templateBroker.clone(); } catch (CloneNotSupportedException e) { throw new InternalException(e); } } + + public Class getTemplateBrokerType(Configuration c) { + return getTemplateBroker(getClassName(), BrokerImpl.class, c, true) + .getClass(); + } + + private BrokerImpl getTemplateBroker(String clsName, Class type, + Configuration conf, boolean fatal) { + if (clsName == null || !clsName.equals(getClassName())) + throw new IllegalArgumentException("clsName != configured value '" + + getClassName() + "'"); + + // This is not synchronized. If there are concurrent invocations + // while _templateBroker is null, we'll just end up with extra + // template brokers, which will get safely garbage collected. + if (_templateBroker == null) + _templateBroker = (BrokerImpl) super.newInstance(clsName, type, + conf, fatal); + return _templateBroker; + } } diff --git a/openjpa-kernel/src/main/java/org/apache/openjpa/conf/OpenJPAConfigurationImpl.java b/openjpa-kernel/src/main/java/org/apache/openjpa/conf/OpenJPAConfigurationImpl.java index e7fd3d2e4..a4a98e19d 100644 --- a/openjpa-kernel/src/main/java/org/apache/openjpa/conf/OpenJPAConfigurationImpl.java +++ b/openjpa-kernel/src/main/java/org/apache/openjpa/conf/OpenJPAConfigurationImpl.java @@ -76,7 +76,7 @@ public class OpenJPAConfigurationImpl // openjpa properties public ObjectValue classResolverPlugin; - public ObjectValue brokerPlugin; + public BrokerValue brokerPlugin; public ObjectValue dataCachePlugin; public ObjectValue dataCacheManagerPlugin; public IntValue dataCacheTimeout; diff --git a/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/AbstractBrokerFactory.java b/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/AbstractBrokerFactory.java index c21412335..7659a9bc5 100644 --- a/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/AbstractBrokerFactory.java +++ b/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/AbstractBrokerFactory.java @@ -19,7 +19,6 @@ package org.apache.openjpa.kernel; import java.io.ObjectStreamException; -import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -29,14 +28,18 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import javax.transaction.Status; import javax.transaction.Synchronization; import javax.transaction.Transaction; import javax.transaction.TransactionManager; import org.apache.commons.lang.StringUtils; +import org.apache.commons.collections.set.MapBackedSet; import org.apache.openjpa.conf.OpenJPAConfiguration; import org.apache.openjpa.conf.OpenJPAVersion; +import org.apache.openjpa.conf.BrokerValue; +import org.apache.openjpa.conf.OpenJPAConfigurationImpl; import org.apache.openjpa.datacache.DataCacheStoreManager; import org.apache.openjpa.ee.ManagedRuntime; import org.apache.openjpa.enhance.PCRegistry; @@ -48,14 +51,12 @@ import org.apache.openjpa.lib.conf.Configuration; import org.apache.openjpa.lib.conf.Configurations; import org.apache.openjpa.lib.log.Log; import org.apache.openjpa.lib.util.J2DoPrivHelper; -import org.apache.openjpa.lib.util.JavaVersions; import org.apache.openjpa.lib.util.Localizer; import java.util.concurrent.ConcurrentHashMap; import org.apache.openjpa.lib.util.concurrent.ConcurrentReferenceHashSet; import java.util.concurrent.locks.ReentrantLock; import org.apache.openjpa.meta.MetaDataRepository; import org.apache.openjpa.util.GeneralException; -import org.apache.openjpa.util.InternalException; import org.apache.openjpa.util.InvalidStateException; import org.apache.openjpa.util.OpenJPAException; import org.apache.openjpa.util.UserException; @@ -90,8 +91,7 @@ public abstract class AbstractBrokerFactory = new ConcurrentHashMap(); // weak-ref tracking of open brokers - private transient Collection _brokers = new ConcurrentReferenceHashSet - (ConcurrentReferenceHashSet.WEAK); + private transient Set _brokers; // cache the class names loaded from the persistent classes property so // that we can re-load them for each new broker @@ -145,6 +145,7 @@ public abstract class AbstractBrokerFactory */ protected AbstractBrokerFactory(OpenJPAConfiguration config) { _conf = config; + _brokers = newBrokerSet(); getPcClassLoaders(); } @@ -384,7 +385,7 @@ public abstract class AbstractBrokerFactory Broker broker; for (Iterator itr = _brokers.iterator(); itr.hasNext();) { broker = (Broker) itr.next(); - // Check for null because _brokers contains weak references + // Check for null because _brokers may contain weak references if ((broker != null) && (!broker.isClosed())) broker.close(); } @@ -464,13 +465,29 @@ public abstract class AbstractBrokerFactory // reset these transient fields to empty values _transactional = new ConcurrentHashMap(); - _brokers = new ConcurrentReferenceHashSet( - ConcurrentReferenceHashSet.WEAK); + _brokers = newBrokerSet(); makeReadOnly(); return this; } + private Set newBrokerSet() { + BrokerValue bv; + if (_conf instanceof OpenJPAConfigurationImpl) + bv = ((OpenJPAConfigurationImpl) _conf).brokerPlugin; + else + bv = (BrokerValue) _conf.getValue(BrokerValue.KEY); + + if (FinalizingBrokerImpl.class.isAssignableFrom( + bv.getTemplateBrokerType(_conf))) { + return MapBackedSet.decorate(new ConcurrentHashMap(), + new Object() { }); + } else { + return new ConcurrentReferenceHashSet( + ConcurrentReferenceHashSet.WEAK); + } + } + //////////////////////// // Methods for Override //////////////////////// @@ -746,6 +763,16 @@ public abstract class AbstractBrokerFactory return Collections.unmodifiableCollection(_brokers); } + /** + * Release broker from any internal data structures. This + * is invoked by broker after the broker is fully closed. + * + * @since 1.1.0 + */ + protected void releaseBroker(BrokerImpl broker) { + _brokers.remove(broker); + } + /** * @return a key that can be used to obtain this broker factory from the * pool at a later time. diff --git a/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java b/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java index f23e9e380..9f9879983 100644 --- a/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java +++ b/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java @@ -4099,6 +4099,8 @@ public class BrokerImpl if (_log.isTraceEnabled()) _closedException = new IllegalStateException(); + _factory.releaseBroker(this); + if (err != null) throw err; }