OPENJPA-515 Add basic Tests for distributed database support.

git-svn-id: https://svn.apache.org/repos/asf/openjpa/trunk@627636 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Pinaki Poddar 2008-02-14 02:26:59 +00:00
parent 07d26ee05a
commit eaf452dcc2
24 changed files with 1524 additions and 70 deletions

View File

@ -32,6 +32,13 @@
<artifactId>openjpa-parent</artifactId> <artifactId>openjpa-parent</artifactId>
<version>1.1.0-SNAPSHOT</version> <version>1.1.0-SNAPSHOT</version>
</parent> </parent>
<properties>
<openjpa.loglevel>INFO</openjpa.loglevel>
<!-- to set debug arguments, you might set the following at the command line:
-Dtest.jvm.arguments="-Xmx500m -agentlib:jdwp=transport=dt_socket,server=y,address=8000"
-->
<test.jvm.arguments>-Xmx500m</test.jvm.arguments>
</properties>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.apache.openjpa</groupId> <groupId>org.apache.openjpa</groupId>
@ -45,7 +52,68 @@
<version>${pom.version}</version> <version>${pom.version}</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jpa_3.0_spec</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.openjpa</groupId>
<artifactId>openjpa-persistence</artifactId>
<version>${pom.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.5</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<profiles>
<!-- Profile for testing with Apache Derby -->
<profile>
<id>test-derby</id>
<activation>
<activeByDefault>false</activeByDefault>
<property><name>test-derby</name></property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<connection.driver.name>org.apache.derby.jdbc.EmbeddedDriver</connection.driver.name>
<connection.url>jdbc:derby:target/database/slice-derby-One;create=true</connection.url>
<connection.username></connection.username>
<connection.password></connection.password>
</properties>
</profile>
<profile>
<id>test-mysql</id>
<activation>
<activeByDefault>false</activeByDefault>
<property><name>test-mysql</name></property>
</activation>
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.5</version>
</dependency>
</dependencies>
<properties>
<connection.driver.name>com.mysql.jdbc.Driver</connection.driver.name>
<connection.url>${openjpa.mysql.url}</connection.url>
<connection.username>${openjpa.mysql.username}</connection.username>
<connection.password>${openjpa.mysql.password}</connection.password>
</properties>
</profile>
</profiles>
<build> <build>
<plugins> <plugins>
<plugin> <plugin>
@ -56,6 +124,47 @@
<target>1.5</target> <target>1.5</target>
</configuration> </configuration>
</plugin> </plugin>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<phase>test-compile</phase>
<configuration>
<tasks>
<ant antfile="src/main/ant/enhancer.xml"
target="enhance"
inheritRefs="true">
<property name="maven.test.skip" value="${maven.test.skip}" />
<property name="test" value="${test}" />
<property name="outdir" value="${project.build.outputDirectory}" />
<property name="project.build.testOutputDirectory" value="${project.build.testOutputDirectory}" />
<property name="openjpa.loglevel" value="${openjpa.loglevel}" />
</ant>
</tasks>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<!-- ======================================================= -->
<!-- All tests are excluded currently -->
<!-- ======================================================= -->
<configuration>
<includes>
<include>**/DummyTest.java</include>
</includes>
<excludes>
<exclude>**/Test*.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins> </plugins>
</build> </build>
</project> </project>

View File

@ -0,0 +1,84 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project name="enhance" basedir=".">
<!--
This build file executes the PCEnhancer on the test entities. It's in a
separate file instead of nested in pom.xml to make some conditional
processing easier.
Test classes will not be compiled if maven is invoked with
-Dmaven.tests.skip=true.
-->
<condition property="maven.test.skip.istrue">
<istrue value="${maven.test.skip}" />
</condition>
<condition property="test.isfalse">
<equals arg1="${test}" arg2="false" />
</condition>
<!-- =================================
target: enhance
================================= -->
<target name="enhance"
description="--> run the enhancer unless test=false"
unless="test.isfalse">
<antcall target="enhance.all.entities"
inheritall="true"
inheritrefs="true" />
</target>
<!-- =================================
target: enhance.all.entities
================================= -->
<target name="enhance.all.entities"
description="--> enhance the test entities"
unless="maven.test.skip.istrue">
<echo> running enhancer</echo>
<!--
Inherited references won't be present until the task is called.
Therefore the path definition needs to stay inside the task.
-->
<path id="cp">
<path refid="maven.test.classpath" />
<path refid="maven.compile.classpath" />
<path refid="maven.dependency.classpath" />
</path>
<taskdef name="openjpac"
classname="org.apache.openjpa.ant.PCEnhancerTask">
<classpath refid="cp" />
</taskdef>
<fileset id="enhance.path.ref"
dir="${project.build.testOutputDirectory}">
<include name="**/Address.class" />
<include name="**/Person.class" />
<include name="**/PObject.class" />
<exclude name="**/Test*.class" />
</fileset>
<openjpac>
<classpath refid="cp" />
<fileset refid="enhance.path.ref" />
<config log="DefaultLevel=${openjpa.loglevel}" />
</openjpac>
</target>
</project>

View File

@ -18,7 +18,7 @@
*/ */
package org.apache.openjpa.slice; package org.apache.openjpa.slice;
import org.apache.openjpa.kernel.BrokerImpl; import org.apache.openjpa.kernel.FinalizingBrokerImpl;
import org.apache.openjpa.kernel.OpCallbacks; import org.apache.openjpa.kernel.OpCallbacks;
import org.apache.openjpa.kernel.OpenJPAStateManager; import org.apache.openjpa.kernel.OpenJPAStateManager;
import org.apache.openjpa.lib.util.Localizer; import org.apache.openjpa.lib.util.Localizer;
@ -35,7 +35,7 @@ import org.apache.openjpa.util.UserException;
* *
*/ */
@SuppressWarnings("serial") @SuppressWarnings("serial")
public class DistributedBrokerImpl extends BrokerImpl { public class DistributedBrokerImpl extends FinalizingBrokerImpl {
private transient String slice; private transient String slice;
private static final Localizer _loc = private static final Localizer _loc =
@ -79,4 +79,14 @@ public class DistributedBrokerImpl extends BrokerImpl {
slice, pc, conf.getActiveSliceNames() })); slice, pc, conf.getActiveSliceNames() }));
return slice; return slice;
} }
@Override
public boolean endOperation() {
try {
return super.endOperation();
} catch (Exception ex) {
}
return true;
}
} }

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -97,7 +98,7 @@ public class ExecutorServiceValue extends PluginValue {
} }
obj = new ThreadPoolExecutor(defaultSize, defaultSize, obj = new ThreadPoolExecutor(defaultSize, defaultSize,
keepAliveTime, TimeUnit.SECONDS, keepAliveTime, TimeUnit.SECONDS,
new PriorityBlockingQueue<Runnable>(), factory); new SynchronousQueue<Runnable>(), factory);
Configurations.configureInstance(obj, conf, opts, getProperty()); Configurations.configureInstance(obj, conf, opts, getProperty());
} }

View File

@ -37,7 +37,6 @@ import org.apache.openjpa.jdbc.conf.JDBCConfigurationImpl;
import org.apache.openjpa.jdbc.schema.DataSourceFactory; import org.apache.openjpa.jdbc.schema.DataSourceFactory;
import org.apache.openjpa.lib.conf.BooleanValue; import org.apache.openjpa.lib.conf.BooleanValue;
import org.apache.openjpa.lib.conf.ConfigurationProvider; import org.apache.openjpa.lib.conf.ConfigurationProvider;
import org.apache.openjpa.lib.conf.Configurations;
import org.apache.openjpa.lib.conf.PluginValue; import org.apache.openjpa.lib.conf.PluginValue;
import org.apache.openjpa.lib.conf.StringListValue; import org.apache.openjpa.lib.conf.StringListValue;
import org.apache.openjpa.lib.conf.StringValue; import org.apache.openjpa.lib.conf.StringValue;
@ -55,6 +54,8 @@ import org.apache.openjpa.util.UserException;
/** /**
* Implements a distributed configuration of JDBCStoreManagers. * Implements a distributed configuration of JDBCStoreManagers.
* The original configuration properties are analyzed to create a set of
* Slice specific properties with defaulting rules.
* *
* @author Pinaki Poddar * @author Pinaki Poddar
* *
@ -67,6 +68,7 @@ public class DistributedJDBCConfigurationImpl extends JDBCConfigurationImpl
private Slice _master; private Slice _master;
private DecoratingDataSource virtualDataSource; private DecoratingDataSource virtualDataSource;
protected BooleanValue lenientPlugin; protected BooleanValue lenientPlugin;
protected StringValue masterPlugin; protected StringValue masterPlugin;
protected StringListValue namesPlugin; protected StringListValue namesPlugin;
@ -90,7 +92,6 @@ public class DistributedJDBCConfigurationImpl extends JDBCConfigurationImpl
Map p = cp.getProperties(); Map p = cp.getProperties();
String pUnit = getPersistenceUnitName(p); String pUnit = getPersistenceUnitName(p);
setDiagnosticContext(pUnit); setDiagnosticContext(pUnit);
Log log = getConfigurationLog();
brokerPlugin.setString(DistributedBrokerImpl.class.getName()); brokerPlugin.setString(DistributedBrokerImpl.class.getName());
@ -189,8 +190,6 @@ public class DistributedJDBCConfigurationImpl extends JDBCConfigurationImpl
public DistributionPolicy getDistributionPolicyInstance() { public DistributionPolicy getDistributionPolicyInstance() {
if (distributionPolicyPlugin.get() == null) { if (distributionPolicyPlugin.get() == null) {
// Configurations.getProperty(distributionPolicyPlugin.getProperty(), m)
// distributionPolicyPlugin.setString(toProperties(false).get(key))
distributionPolicyPlugin.instantiate(DistributionPolicy.class, distributionPolicyPlugin.instantiate(DistributionPolicy.class,
this, true); this, true);
} }
@ -239,6 +238,8 @@ public class DistributedJDBCConfigurationImpl extends JDBCConfigurationImpl
handleBadConnection(isLenient, slice, ex); handleBadConnection(isLenient, slice, ex);
} }
} }
if (dataSources.isEmpty())
throw new UserException(_loc.get("no-slice"));
DistributedDataSource result = new DistributedDataSource(dataSources); DistributedDataSource result = new DistributedDataSource(dataSources);
return result; return result;
} }
@ -324,29 +325,29 @@ public class DistributedJDBCConfigurationImpl extends JDBCConfigurationImpl
for (String key : sliceNames) { for (String key : sliceNames) {
JDBCConfiguration child = new JDBCConfigurationImpl(); JDBCConfiguration child = new JDBCConfigurationImpl();
child.fromProperties(createSliceProperties(original, key)); child.fromProperties(createSliceProperties(original, key));
child.setId(PREFIX_SLICE + key); child.setId(key);
Slice slice = new Slice(key, child); Slice slice = new Slice(key, child);
_slices.add(slice); _slices.add(slice);
if (log.isTraceEnabled()) if (log.isTraceEnabled())
log.trace(_loc.get("slice-configuration", key, child log.trace(_loc.get("slice-configuration", key, child
.toProperties(false))); .toProperties(false)));
} }
setMaster(); setMaster(original);
} }
/** /**
* Finds the slices. If <code>slice.Names</code> property is available * Finds the slices. If <code>openjpa.slice.Names</code> property is
* then the slices are ordered in the way they are listed. Otherwise scans * specified then the slices are ordered in the way they are listed.
* all available slices by looking for property of the form * Otherwise scans all available slices by looking for property of the form
* <code>slice.XYZ.abc</code> where <code>XYZ</code> is the slice * <code>openjpa.slice.XYZ.abc</code> where <code>XYZ</code> is the slice
* identifier and <code>abc</code> is openjpa property name. The slices * identifier and <code>abc</code> is any openjpa property name. The slices
* are then ordered alphabetically. * are then ordered alphabetically by their identifier.
*/ */
private List<String> findSlices(Map p) { private List<String> findSlices(Map p) {
List<String> sliceNames = new ArrayList<String>(); List<String> sliceNames = new ArrayList<String>();
Log log = getConfigurationLog(); Log log = getConfigurationLog();
String key = PREFIX_SLICE+namesPlugin.getProperty(); String key = PREFIX_SLICE + namesPlugin.getProperty();
boolean explicit = p.containsKey(key); boolean explicit = p.containsKey(key);
if (explicit) { if (explicit) {
String[] values = p.get(key).toString().split("\\,"); String[] values = p.get(key).toString().split("\\,");
@ -355,7 +356,7 @@ public class DistributedJDBCConfigurationImpl extends JDBCConfigurationImpl
sliceNames.add(name.trim()); sliceNames.add(name.trim());
} else { } else {
if (log.isWarnEnabled()) if (log.isWarnEnabled())
log.warn(_loc.get("no-slice-names")); log.warn(_loc.get("no-slice-names", key));
sliceNames = scanForSliceNames(p); sliceNames = scanForSliceNames(p);
Collections.sort(sliceNames); Collections.sort(sliceNames);
} }
@ -365,6 +366,12 @@ public class DistributedJDBCConfigurationImpl extends JDBCConfigurationImpl
return sliceNames; return sliceNames;
} }
/**
* Scan the given map for slice-specific property of the form
* <code>openjpa.slice.XYZ.abc</code> (while ignoring
* <code>openjpa.slice.XYZ</code> as they refer to slice-wide property)
* to determine the names of all available slices.
*/
private List<String> scanForSliceNames(Map p) { private List<String> scanForSliceNames(Map p) {
List<String> sliceNames = new ArrayList<String>(); List<String> sliceNames = new ArrayList<String>();
for (Object o : p.keySet()) { for (Object o : p.keySet()) {
@ -379,17 +386,17 @@ public class DistributedJDBCConfigurationImpl extends JDBCConfigurationImpl
return sliceNames; return sliceNames;
} }
static int getPartCount(String s) { private static int getPartCount(String s) {
return (s == null) ? 0 : s.split(REGEX_DOT).length; return (s == null) ? 0 : s.split(REGEX_DOT).length;
} }
static String chopHead(String s, String head) { private static String chopHead(String s, String head) {
if (s.startsWith(head)) if (s.startsWith(head))
return s.substring(head.length()); return s.substring(head.length());
return s; return s;
} }
static String chopTail(String s, String tail) { private static String chopTail(String s, String tail) {
int i = s.lastIndexOf(tail); int i = s.lastIndexOf(tail);
if (i == -1) if (i == -1)
return s; return s;
@ -436,14 +443,15 @@ public class DistributedJDBCConfigurationImpl extends JDBCConfigurationImpl
/** /**
* Determine the master slice. * Determine the master slice.
*/ */
private void setMaster() { private void setMaster(Map original) {
String masterSlice = masterPlugin.get(); String key = PREFIX_SLICE + masterPlugin.getProperty();
Object masterSlice = original.get(key);
Log log = getConfigurationLog(); Log log = getConfigurationLog();
List<Slice> activeSlices = getSlices(null); List<Slice> activeSlices = getSlices(null);
if (masterSlice == null || masterSlice.length() == 0) { if (masterSlice == null) {
_master = activeSlices.get(0); _master = activeSlices.get(0);
if (log.isWarnEnabled()) if (log.isWarnEnabled())
log.warn(_loc.get("no-master-slice", _master)); log.warn(_loc.get("no-master-slice", key, _master));
return; return;
} }
for (Slice slice:activeSlices) for (Slice slice:activeSlices)

View File

@ -18,6 +18,8 @@
*/ */
package org.apache.openjpa.slice.jdbc; package org.apache.openjpa.slice.jdbc;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.BitSet; import java.util.BitSet;
import java.util.Collection; import java.util.Collection;
@ -82,6 +84,7 @@ class DistributedStoreManager extends JDBCStoreManager {
private boolean isXA; private boolean isXA;
private TransactionManager _tm; private TransactionManager _tm;
private final DistributedJDBCConfiguration _conf; private final DistributedJDBCConfiguration _conf;
private boolean _active = false;
private Log _log; private Log _log;
private static final Localizer _loc = private static final Localizer _loc =
Localizer.forPackage(DistributedStoreManager.class); Localizer.forPackage(DistributedStoreManager.class);
@ -199,6 +202,9 @@ class DistributedStoreManager extends JDBCStoreManager {
} }
public void begin() { public void begin() {
if (_active)
return;
_active = true;
TransactionManager tm = getTransactionManager(); TransactionManager tm = getTransactionManager();
for (SliceStoreManager slice : _slices) { for (SliceStoreManager slice : _slices) {
try { try {
@ -239,16 +245,21 @@ class DistributedStoreManager extends JDBCStoreManager {
} }
public void close() { public void close() {
_active = false;
for (SliceStoreManager slice : _slices) for (SliceStoreManager slice : _slices)
slice.close(); slice.close();
} }
public void commit() { public void commit() {
if (!_active)
return;
TransactionManager tm = getTransactionManager(); TransactionManager tm = getTransactionManager();
try { try {
tm.commit(); tm.commit();
} catch (Exception e) { } catch (Exception e) {
throw new StoreException(e); throw new StoreException(e);
} finally {
_active = false;
} }
} }
@ -280,6 +291,7 @@ class DistributedStoreManager extends JDBCStoreManager {
return false; return false;
} }
/** /**
* Flush the given StateManagers after binning them to respective physical * Flush the given StateManagers after binning them to respective physical
* slices. * slices.
@ -421,11 +433,15 @@ class DistributedStoreManager extends JDBCStoreManager {
} }
public void rollback() { public void rollback() {
if (!_active)
return;
TransactionManager tm = getTransactionManager(); TransactionManager tm = getTransactionManager();
try { try {
tm.rollback(); tm.rollback();
} catch (Exception e) { } catch (Exception e) {
throw new StoreException(e); throw new StoreException(e);
} finally {
_active = false;
} }
} }
@ -477,6 +493,15 @@ class DistributedStoreManager extends JDBCStoreManager {
return _tm; return _tm;
} }
@Override
protected RefCountConnection connectInternal() throws SQLException {
List<Connection> list = new ArrayList<Connection>();
for (SliceStoreManager slice : _slices)
list.add(slice.getConnection());
DistributedConnection con = new DistributedConnection(list);
return new RefCountConnection(con);
}
private static class Flusher implements Callable<Collection> { private static class Flusher implements Callable<Collection> {
final SliceStoreManager store; final SliceStoreManager store;
final Collection toFlush; final Collection toFlush;

View File

@ -52,7 +52,7 @@ public class DistributedTransactionManager implements TransactionManager {
Localizer.forPackage(DistributedTransactionManager.class); Localizer.forPackage(DistributedTransactionManager.class);
public void begin() throws NotSupportedException, SystemException { public void begin() throws NotSupportedException, SystemException {
DistributedXATransaction txn = getTransaction(false); DistributedXATransaction txn = (DistributedXATransaction)getTransaction();
int i = 1; int i = 1;
Set<XAResource> resources = txn.getEnlistedResources(); Set<XAResource> resources = txn.getEnlistedResources();
for (XAResource resource : resources) { for (XAResource resource : resources) {
@ -70,22 +70,24 @@ public class DistributedTransactionManager implements TransactionManager {
public void commit() throws HeuristicMixedException, public void commit() throws HeuristicMixedException,
HeuristicRollbackException, IllegalStateException, HeuristicRollbackException, IllegalStateException,
RollbackException, SecurityException, SystemException { RollbackException, SecurityException, SystemException {
DistributedXATransaction txn = getTransaction(true); DistributedXATransaction txn = getTransactionOfCurrentThread(true);
Set<XAResource> resources = txn.getEnlistedResources(); Set<XAResource> resources = txn.getEnlistedResources();
int branchId = 1; int branchId = 1;
boolean nextPhase = true; Exception failedFirstPhase = null;
Exception failedSecondPhase = null;
for (XAResource resource : resources) { for (XAResource resource : resources) {
XID branch = txn.getXID().branch(branchId++); XID branch = txn.getXID().branch(branchId++);
try { try {
resource.end(branch, TMSUCCESS); resource.end(branch, TMSUCCESS);
resource.prepare(branch); resource.prepare(branch);
} catch (XAException e) { } catch (XAException e) {
nextPhase = false; failedFirstPhase = e;
} }
} }
branchId = 1; // reset branchId = 1; // reset
if (!nextPhase) { if (failedFirstPhase != null) {
for (XAResource resource : resources) { for (XAResource resource : resources) {
try { try {
XID branch = txn.getXID().branch(branchId++); XID branch = txn.getXID().branch(branchId++);
@ -93,20 +95,24 @@ public class DistributedTransactionManager implements TransactionManager {
} catch (XAException e) { } catch (XAException e) {
// ignore // ignore
} }
throw new SystemException(_loc.get("prepare-failed") }
.getMessage()); } else {
} branchId = 1; // reset
} for (XAResource resource : resources) {
XID branch = txn.getXID().branch(branchId++);
branchId = 1; // reset try {
for (XAResource resource : resources) { resource.commit(branch, false);
XID branch = txn.getXID().branch(branchId++); } catch (XAException e) {
try { failedSecondPhase = e;
resource.commit(branch, false); }
} catch (XAException e) {
throw new SystemException(e.getMessage());
} }
} }
txn.commit();
txns.set(null);
if (failedFirstPhase != null) {
throw new SystemException(failedFirstPhase.getMessage());
} else if (failedSecondPhase != null)
throw new SystemException(failedSecondPhase.getMessage());
} }
public int getStatus() throws SystemException { public int getStatus() throws SystemException {
@ -114,7 +120,10 @@ public class DistributedTransactionManager implements TransactionManager {
} }
public Transaction getTransaction() throws SystemException { public Transaction getTransaction() throws SystemException {
return getTransaction(false); DistributedXATransaction txn = getTransactionOfCurrentThread(false);
if (txn == null)
txn = newTransaction();
return txn;
} }
public void resume(Transaction arg0) throws IllegalStateException, public void resume(Transaction arg0) throws IllegalStateException,
@ -124,7 +133,9 @@ public class DistributedTransactionManager implements TransactionManager {
public void rollback() throws IllegalStateException, SecurityException, public void rollback() throws IllegalStateException, SecurityException,
SystemException { SystemException {
DistributedXATransaction txn = getTransaction(true); DistributedXATransaction txn = getTransactionOfCurrentThread(true);
if (txn == null)
return;
Set<XAResource> slices = txn.getEnlistedResources(); Set<XAResource> slices = txn.getEnlistedResources();
int branchId = 1; int branchId = 1;
for (XAResource slice : slices) { for (XAResource slice : slices) {
@ -135,6 +146,8 @@ public class DistributedTransactionManager implements TransactionManager {
} catch (XAException e) { } catch (XAException e) {
} }
} }
txn.rollback();
txns.set(null);
} }
public void setRollbackOnly() throws IllegalStateException, SystemException { public void setRollbackOnly() throws IllegalStateException, SystemException {
@ -173,19 +186,49 @@ public class DistributedTransactionManager implements TransactionManager {
* transaction, a new transaction is created with a global identifier * transaction, a new transaction is created with a global identifier
* and associated with the current thread. * and associated with the current thread.
*/ */
DistributedXATransaction getTransaction(boolean mustExist) { // DistributedXATransaction getTransaction(boolean create, boolean mustExist) {
DistributedXATransaction txn = txns.get(); // DistributedXATransaction txn = txns.get();
if (txn == null) { // if (txn == null && mustExist) {
if (mustExist) // throw new IllegalStateException(_loc.get("no-txn-on-thread",
throw new IllegalStateException(_loc.get("no-txn-on-thread", // Thread.currentThread().getName()).getMessage());
Thread.currentThread().getName()).getMessage()); // }
byte[] global = //// if (txn != null && !mustExist) {
Long.toHexString(System.currentTimeMillis()).getBytes(); //// throw new IllegalStateException(_loc.get("txn-exists--on-thread",
XID xid = new XID(0, global, new byte[] { 0x1 }); //// txn.getXID(), Thread.currentThread().getName()).getMessage());
txn = new DistributedXATransaction(xid, this); //// }
txns.set(txn); // if (create && txn == null) {
} // }
//
//
// return txn;
// }
// DistributedXATransaction getTransactionOfCurrentThread() {
// return txns.get();
// }
DistributedXATransaction getTransactionOfCurrentThread(boolean mustExist) {
DistributedXATransaction txn = txns.get();
if (txn == null && mustExist)
throw new IllegalStateException(_loc.get("no-txn-on-thread",
Thread.currentThread().getName()).getMessage());
return txn; return txn;
} }
DistributedXATransaction newTransaction() {
DistributedXATransaction txn = getTransactionOfCurrentThread(false);
if (txn != null)
throw new IllegalStateException(_loc.get("txn-exists-on-thread",
txn.getXID(), Thread.currentThread().getName()).getMessage());
byte[] global =
Long.toHexString(System.currentTimeMillis()).getBytes();
XID xid = new XID(0, global, new byte[] { 0x1 });
txn = new DistributedXATransaction(xid, this);
txns.set(txn);
return txn;
}
} }

View File

@ -25,6 +25,7 @@ import java.util.Set;
import javax.transaction.HeuristicMixedException; import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException; import javax.transaction.HeuristicRollbackException;
import javax.transaction.RollbackException; import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.Synchronization; import javax.transaction.Synchronization;
import javax.transaction.SystemException; import javax.transaction.SystemException;
import javax.transaction.Transaction; import javax.transaction.Transaction;
@ -39,13 +40,11 @@ import javax.transaction.xa.XAResource;
* *
*/ */
class DistributedXATransaction implements Transaction { class DistributedXATransaction implements Transaction {
private static ThreadLocal<Transaction> _trans = new ThreadLocal<Transaction>();
private Set<XAResource> _slices = new HashSet<XAResource>(); private Set<XAResource> _slices = new HashSet<XAResource>();
private Set<Synchronization> _syncs = new HashSet<Synchronization>(); private Set<Synchronization> _syncs = new HashSet<Synchronization>();
private final TransactionManager _tm; // private final TransactionManager _tm;
private final XID xid; private final XID xid;
private int _status; private int _status;
private boolean _rollbackOnly;
/** /**
* Construct with * Construct with
@ -54,7 +53,7 @@ class DistributedXATransaction implements Transaction {
*/ */
DistributedXATransaction(XID xid, TransactionManager tm) { DistributedXATransaction(XID xid, TransactionManager tm) {
this.xid = xid; this.xid = xid;
this._tm = tm; _status = Status.STATUS_ACTIVE;
} }
public XID getXID() { public XID getXID() {
@ -64,7 +63,8 @@ class DistributedXATransaction implements Transaction {
public void commit() throws HeuristicMixedException, public void commit() throws HeuristicMixedException,
HeuristicRollbackException, RollbackException, SecurityException, HeuristicRollbackException, RollbackException, SecurityException,
SystemException { SystemException {
_tm.commit(); _status = Status.STATUS_COMMITTED;
_slices.clear();
} }
public boolean delistResource(XAResource arg0, int arg1) public boolean delistResource(XAResource arg0, int arg1)
@ -87,11 +87,12 @@ class DistributedXATransaction implements Transaction {
} }
public void rollback() throws IllegalStateException, SystemException { public void rollback() throws IllegalStateException, SystemException {
_tm.rollback(); _status = Status.STATUS_ROLLEDBACK;
_slices.clear();
} }
public void setRollbackOnly() throws IllegalStateException, SystemException { public void setRollbackOnly() throws IllegalStateException, SystemException {
_rollbackOnly = true; _status = Status.STATUS_MARKED_ROLLBACK;
} }
Set<XAResource> getEnlistedResources() { Set<XAResource> getEnlistedResources() {

View File

@ -18,6 +18,7 @@
*/ */
package org.apache.openjpa.slice.transaction; package org.apache.openjpa.slice.transaction;
import java.sql.SQLException;
import java.util.Set; import java.util.Set;
import javax.transaction.HeuristicMixedException; import javax.transaction.HeuristicMixedException;
@ -60,7 +61,12 @@ public class NaiveTransactionManager implements TransactionManager {
DistributedNaiveTransaction txn = getTransaction(false); DistributedNaiveTransaction txn = getTransaction(false);
Set<SliceStoreManager> slices = txn.getEnlistedResources(); Set<SliceStoreManager> slices = txn.getEnlistedResources();
for (SliceStoreManager slice : slices) { for (SliceStoreManager slice : slices) {
slice.commit(); try {
if (!slice.getConnection().getAutoCommit())
slice.commit();
} catch (SQLException e) {
e.printStackTrace();
}
} }
} }
@ -82,7 +88,12 @@ public class NaiveTransactionManager implements TransactionManager {
DistributedNaiveTransaction txn = getTransaction(false); DistributedNaiveTransaction txn = getTransaction(false);
Set<SliceStoreManager> slices = txn.getEnlistedResources(); Set<SliceStoreManager> slices = txn.getEnlistedResources();
for (SliceStoreManager slice : slices) { for (SliceStoreManager slice : slices) {
slice.commit(); try {
if (!slice.getConnection().getAutoCommit())
slice.rollback();
} catch (SQLException e) {
e.printStackTrace();
}
} }
} }

View File

@ -50,10 +50,11 @@ slice-xa-disabled: Not all active slices "{0}" is XA-complaint and hence store \
two-phase: "{3}".{0}"(xid=[{4}]] Connection={1} XAConnection={2} two-phase: "{3}".{0}"(xid=[{4}]] Connection={1} XAConnection={2}
factory-init: Starting OpenJPA Slice {0} factory-init: Starting OpenJPA Slice {0}
config-init: Configuring Slice {0} config-init: Configuring Slice {0}
no-slice-names: Slice identifiers are not listed in [slice.Names] property. \ no-slice-names: Slice identifiers are not explicitly listed via "{0}" property.\
The configuration will be scanned to determine slice identifiers. The configuration will be scanned to determine slice identifiers.
no-slice: No slices are configured or available
no-master-slice: No master slice has been configured explicitly in \ no-master-slice: No master slice has been configured explicitly in \
[slice.Master] property. The first slice "{0}" in the list of configured \ "{0}" property. The first slice "{1}" in the list of configured \
slices will be used as master. slices will be used as master.
resource-xa-tm-not-2pc: All slices is using XA-complaint driver but the \ resource-xa-tm-not-2pc: All slices is using XA-complaint driver but the \
configured "{0}" transaction manager is not capable of enlisting XA-aware \ configured "{0}" transaction manager is not capable of enlisting XA-aware \

View File

@ -15,5 +15,5 @@
# specific language governing permissions and limitations # specific language governing permissions and limitations
# under the License. # under the License.
no-txn-on-thread: No transaction is associated with current thread "{0}" no-txn-on-thread: No transaction is associated with current thread "{0}"
prepare-failed: one or more XA-complaint resources have failed to prepare for \ prepare-failed: One or more XA-complaint resources have failed to prepare for \
commit during the first phase of a two-phase commit protocol. commit during the first phase of a two-phase commit protocol due to {0}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice;
import javax.persistence.*;
@Entity
public class Address {
@Id
@GeneratedValue
private long id;
private String city;
private int zip;
@OneToOne(mappedBy = "address")
Person owner;
@Version
private long version;
public long getVersion() {
return version;
}
public Address() {
this("?", 0);
}
public Address(String city, int zip) {
setCity(city);
setZip(zip);
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public int getZip() {
return zip;
}
public void setZip(int zip) {
this.zip = zip;
}
public Person getOwner() {
return owner;
}
public void setOwner(Person owner) {
this.owner = owner;
}
public long getId() {
return id;
}
public String toString() {
return city;
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice;
import javax.persistence.Entity;
import javax.persistence.Id;
@Entity
public class PObject {
@Id
private long id;
private int value;
public PObject() {
this(System.currentTimeMillis());
}
public PObject(long id) {
this.id = id;
}
public long getId() {
return id;
}
public int getValue() {
return value;
}
public void setValue(int i) {
value = i;
}
}

View File

@ -0,0 +1,243 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
import junit.framework.TestCase;
import junit.framework.TestResult;
import org.apache.openjpa.kernel.AbstractBrokerFactory;
import org.apache.openjpa.kernel.Broker;
import org.apache.openjpa.meta.ClassMetaData;
import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
import org.apache.openjpa.persistence.JPAFacadeHelper;
/**
* Base test class providing persistence utilities.
*/
public abstract class PersistenceTestCase
extends TestCase {
/**
* Marker object you an pass to {@link #setUp} to indicate that the
* database tables should be cleared.
*/
protected static final Object CLEAR_TABLES = new Object();
/**
* The {@link TestResult} instance for the current test run.
*/
protected TestResult testResult;
/**
* Create an entity manager factory. Put {@link #CLEAR_TABLES} in
* this list to tell the test framework to delete all table contents
* before running the tests.
*
* @param props list of persistent types used in testing and/or
* configuration values in the form key,value,key,value...
*/
protected OpenJPAEntityManagerFactorySPI createEMF(Object... props) {
return createNamedEMF(getPersistenceUnitName(), props);
}
/**
* The name of the persistence unit that this test class should use
* by default. This defaults to "test".
*/
protected String getPersistenceUnitName() {
return "test";
}
/**
* Create an entity manager factory for persistence unit <code>pu</code>.
* Put {@link #CLEAR_TABLES} in
* this list to tell the test framework to delete all table contents
* before running the tests.
*
* @param props list of persistent types used in testing and/or
* configuration values in the form key,value,key,value...
*/
protected OpenJPAEntityManagerFactorySPI createNamedEMF(String pu,
Object... props) {
Map map = new HashMap(System.getProperties());
List<Class> types = new ArrayList<Class>();
boolean prop = false;
for (int i = 0; i < props.length; i++) {
if (prop) {
map.put(props[i - 1], props[i]);
prop = false;
} else if (props[i] == CLEAR_TABLES) {
map.put("openjpa.jdbc.SynchronizeMappings",
"buildSchema(ForeignKeys=true,"
+ "SchemaAction='add,deleteTableContents')");
} else if (props[i] instanceof Class)
types.add((Class) props[i]);
else if (props[i] != null)
prop = true;
}
if (!types.isEmpty()) {
StringBuffer buf = new StringBuffer();
for (Class c : types) {
if (buf.length() > 0)
buf.append(";");
buf.append(c.getName());
}
map.put("openjpa.MetaDataFactory",
"jpa(Types=" + buf.toString() + ")");
}
return (OpenJPAEntityManagerFactorySPI) Persistence.
createEntityManagerFactory(pu, map);
}
@Override
public void run(TestResult testResult) {
this.testResult = testResult;
super.run(testResult);
}
@Override
public void tearDown() throws Exception {
try {
super.tearDown();
} catch (Exception e) {
// if a test failed, swallow any exceptions that happen
// during tear-down, as these just mask the original problem.
if (testResult.wasSuccessful())
throw e;
}
}
/**
* Safely close the given factory.
*/
protected boolean closeEMF(EntityManagerFactory emf) {
if (emf == null || !emf.isOpen())
return false;
closeAllOpenEMs(emf);
emf.close();
return !emf.isOpen();
}
/**
* Closes all open entity managers after first rolling back any open transactions
*/
protected void closeAllOpenEMs(EntityManagerFactory emf) {
if (emf == null || !emf.isOpen())
return;
for (Iterator iter = ((AbstractBrokerFactory) JPAFacadeHelper
.toBrokerFactory(emf)).getOpenBrokers().iterator();
iter.hasNext(); ) {
Broker b = (Broker) iter.next();
if (b != null && !b.isClosed()) {
EntityManager em = JPAFacadeHelper.toEntityManager(b);
if (em.getTransaction().isActive())
em.getTransaction().rollback();
em.close();
}
}
}
/**
* Delete all instances of the given types using bulk delete queries,
* but do not close any open entity managers.
*/
protected void clear(EntityManagerFactory emf, Class... types) {
if (emf == null || types.length == 0)
return;
List<ClassMetaData> metas = new ArrayList<ClassMetaData>(types.length);
for (Class c : types) {
ClassMetaData meta = JPAFacadeHelper.getMetaData(emf, c);
if (meta != null)
metas.add(meta);
}
clear(emf, false, metas.toArray(new ClassMetaData[metas.size()]));
}
/**
* Delete all instances of the persistent types registered with the given
* factory using bulk delete queries, after first closing all open entity
* managers (and rolling back any open transactions).
*/
protected void clear(EntityManagerFactory emf) {
if (emf == null)
return;
clear(emf, true, ((OpenJPAEntityManagerFactorySPI) emf).getConfiguration().
getMetaDataRepositoryInstance().getMetaDatas());
}
/**
* Delete all instances of the given types using bulk delete queries.
* @param closeEMs TODO
*/
private void clear(EntityManagerFactory emf, boolean closeEMs, ClassMetaData... types) {
if (emf == null || types.length == 0)
return;
// prevent deadlock by closing the open entity managers
// and rolling back any open transactions
// before issuing delete statements on a new entity manager.
if (closeEMs)
closeAllOpenEMs(emf);
EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
for (ClassMetaData meta : types) {
if (!meta.isMapped() || meta.isEmbeddedOnly()
|| Modifier.isAbstract(meta.getDescribedType().getModifiers()))
continue;
List all = em.createQuery("SELECT o FROM " + meta.getTypeAlias() + " o").
getResultList();
for (Object pc:all)
em.remove(pc);
}
em.getTransaction().commit();
em.close();
}
/**
* Return the entity name for the given type.
*/
protected String entityName(EntityManagerFactory emf, Class c) {
ClassMetaData meta = JPAFacadeHelper.getMetaData(emf, c);
return (meta == null) ? null : meta.getTypeAlias();
}
public static void assertNotEquals(Object o1, Object o2) {
if (o1 == o2)
fail("expected args to be different; were the same instance.");
else if (o1 == null || o2 == null)
return;
else if (o1.equals(o2))
fail("expected args to be different; compared equal.");
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice;
import javax.persistence.*;
@Entity
public class Person {
@Id
@GeneratedValue
private long id;
private String name;
@Version
private long version;
@OneToOne(cascade=CascadeType.ALL)
private Address address;
public Person() {
this("?");
}
public Person(String name) {
setName(name);
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Address getAddress() {
return address;
}
public void setAddress(Address address) {
this.address = address;
address.setOwner(this);
}
public long getId() {
return id;
}
public String toString() {
return name;
}
public long getVersion() {
return version;
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice;
import org.apache.openjpa.jdbc.meta.ClassMapping;
import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
public abstract class SingleEMFTestCase
extends PersistenceTestCase {
protected OpenJPAEntityManagerFactorySPI emf;
/**
* Call {@link #setUp(Object...)} with no arguments so that the emf
* set-up happens even if <code>setUp()</code> is not called from the
* subclass.
*/
public void setUp() throws Exception {
setUp(new Object[0]);
}
/**
* Initialize entity manager factory. Put {@link #CLEAR_TABLES} in
* this list to tell the test framework to delete all table contents
* before running the tests.
*
* @param props list of persistent types used in testing and/or
* configuration values in the form key,value,key,value...
*/
protected void setUp(Object... props) {
emf = createEMF(props);
}
/**
* Closes the entity manager factory.
*/
public void tearDown() throws Exception {
super.tearDown();
if (emf == null)
return;
try {
clear(emf);
} catch (Exception e) {
// if a test failed, swallow any exceptions that happen
// during tear-down, as these just mask the original problem.
if (testResult.wasSuccessful())
throw e;
} finally {
closeEMF(emf);
}
}
protected ClassMapping getMapping(String name) {
return (ClassMapping) emf.getConfiguration()
.getMetaDataRepositoryInstance().getMetaData(name,
getClass().getClassLoader(), true);
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice;
import javax.persistence.EntityManager;
import org.apache.openjpa.slice.jdbc.DistributedJDBCConfiguration;
public abstract class SliceTestCase extends SingleEMFTestCase {
protected void setUp(Object... props) {
super.setUp(props);
assertTrue(emf.getClass() + " is not a slice configuration. Check" +
" that BrokerFactory for the persistence unit is set to slice",
emf.getConfiguration() instanceof DistributedJDBCConfiguration);
}
int count(Class<?> type) {
EntityManager em = emf.createEntityManager();
String query = "SELECT COUNT(p) FROM " + type.getSimpleName() + " p";
Number number = (Number) em.createQuery(query).getSingleResult();
return number.intValue();
}
}

View File

@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice;
import java.util.List;
import javax.persistence.EntityManager;
public class TestBasic extends SliceTestCase {
private static String persistenceUnitName = "slice";
public void setUp() throws Exception {
super.setUp(PObject.class, Person.class, Address.class);
}
PObject persist() {
EntityManager em = emf.createEntityManager();
int value = (int)(System.currentTimeMillis()%100);
PObject pc = new PObject();
em.getTransaction().begin();
em.persist(pc);
pc.setValue(value);
em.getTransaction().commit();
em.clear();
return pc;
}
public void testDelete() {
EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
List all = em.createQuery("SELECT p FROM PObject p").getResultList();
for (Object pc:all)
em.remove(pc);
em.getTransaction().commit();
int count = count(PObject.class);
assertEquals(0, count);
}
public void testBulkDelete() {
EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
int c = count(PObject.class);
int d = em.createQuery("DELETE FROM PObject p").executeUpdate();
assertEquals(c, d);
em.getTransaction().commit();
c = count(PObject.class);
assertEquals(0, c);
}
/**
* Stores and finds the same object.
*/
public void testFind() {
PObject pc = persist();
int value = pc.getValue();
EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
PObject pc2 = em.find(PObject.class, pc.getId());
assertNotNull(pc2);
assertNotEquals(pc, pc2);
assertEquals(pc.getId(), pc2.getId());
assertEquals(value, pc2.getValue());
}
public void testPersistIndependentObjects() {
int before = count(PObject.class);
EntityManager em = emf.createEntityManager();
int N = 2;
long start = System.currentTimeMillis();
em.getTransaction().begin();
for (int i=0; i<N; i++)
em.persist(new PObject(start++));
em.getTransaction().commit();
em.clear();
int after = count(PObject.class);
assertEquals(before+N, after);
}
public void testPersistConnectedObjectGraph() {
Person p1 = new Person("A");
Person p2 = new Person("B");
Person p3 = new Person("C");
Address a1 = new Address("Rome", 12345);
Address a2 = new Address("San Francisco", 23456);
Address a3 = new Address("New York", 34567);
p1.setAddress(a1);
p2.setAddress(a2);
p3.setAddress(a3);
EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
em.persist(p1);
em.persist(p2);
em.persist(p3);
em.getTransaction().commit();
em.clear();
em = emf.createEntityManager();
em.getTransaction().begin();
List<Person> persons = em.createQuery("SELECT p FROM Person p WHERE p.name=?1").
setParameter(1, "A").getResultList();
List<Address> addresses = em.createQuery("SELECT a FROM Address a").getResultList();
for (Address pc:addresses) {
assertNotNull(pc.getCity());
assertNotNull(pc.getOwner().getName());
}
for (Person pc:persons) {
assertNotNull(pc.getName());
assertNotNull(pc.getAddress().getCity());
}
em.getTransaction().rollback();
}
/**
* Merge only works if the distribution policy assigns the correct slice
* from which the instance was fetched.
*/
public void testMerge() {
PObject pc = persist();
int value = pc.getValue();
pc.setValue(value+1);
assertNotNull(pc);
EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
PObject pc2 = em.merge(pc);
em.getTransaction().commit();
em.clear();
assertNotNull(pc2);
assertNotEquals(pc, pc2);
assertEquals(pc.getId(), pc2.getId());
assertEquals(value+1, pc2.getValue());
}
protected String getPersistenceUnitName() {
return persistenceUnitName;
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice;
import java.util.List;
import org.apache.openjpa.kernel.Broker;
import org.apache.openjpa.kernel.BrokerFactory;
import org.apache.openjpa.persistence.EntityManagerFactoryImpl;
import org.apache.openjpa.slice.jdbc.DistributedJDBCBrokerFactory;
import org.apache.openjpa.slice.jdbc.DistributedJDBCConfiguration;
import org.apache.openjpa.slice.transaction.NaiveTransactionManager;
/**
*
* @author Pinaki Poddar
*
*/
public class TestConfiguration extends SliceTestCase {
/**
* Tests that user-level configurations are set.
*
*/
public void testConfig() {
assertTrue(emf.getConfiguration() instanceof DistributedConfiguration);
DistributedJDBCConfiguration conf = (DistributedJDBCConfiguration)
emf.getConfiguration();
List<String> slices = conf.getAvailableSliceNames();
assertTrue(slices.size()>1);
assertTrue(slices.contains("One"));
assertTrue(slices.contains("Two"));
assertTrue(slices.contains("Three"));
assertEquals("jdbc:mysql://localhost/slice1", conf.getSlice("One").getConfiguration().getConnectionURL());
assertEquals("jdbc:mysql://localhost/slice2", conf.getSlice("Two").getConfiguration().getConnectionURL());
assertEquals("jdbc:mysql://localhost/slice3", conf.getSlice("Three").getConfiguration().getConnectionURL());
assertTrue(conf.getTransactionManagerInstance() instanceof NaiveTransactionManager);
BrokerFactory bf = ((EntityManagerFactoryImpl)emf).getBrokerFactory();
Broker broker = bf.newBroker();
assertEquals(DistributedJDBCBrokerFactory.class, bf.getClass());
assertEquals(DistributedBrokerImpl.class, broker.getClass());
assertNotNull(conf.getDistributionPolicyInstance());
emf.createEntityManager();
slices = conf.getActiveSliceNames();
assertTrue(slices.size()>1);
assertTrue(slices.contains("One"));
assertTrue(slices.contains("Two"));
assertFalse(slices.contains("Three"));
conf.getExecutorServiceInstance();
}
protected String getPersistenceUnitName() {
return "per-slice";
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice;
import java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import org.apache.openjpa.slice.SlicePersistence;
public class TestQuery extends SliceTestCase {
public void setUp() throws Exception {
super.setUp(PObject.class, Person.class, Address.class);
EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
long id = System.currentTimeMillis();
for (int i=0;i<0;i++) {
PObject pc = new PObject(id++);
pc.setValue(i);
em.persist(pc);
String slice = SlicePersistence.getSlice(pc);
String expected = (i%2 == 0) ? "Even" : "Odd";
assertEquals(expected, slice);
}
em.getTransaction().commit();
}
public void testQueryResultIsOrderedAcrossSlice() {
EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
Query query = em.createQuery("SELECT p.value,p FROM PObject p ORDER BY p.value ASC");
List result = query.getResultList();
Integer old = Integer.MIN_VALUE;
for (Object row:result) {
Object[] line = (Object[])row;
int value = ((Integer)line[0]).intValue();
PObject pc = (PObject)line[1];
assertTrue(value >= old);
old = value;
assertEquals(value, pc.getValue());
}
em.getTransaction().commit();
}
public void testAggregateQuery() {
EntityManager em = emf.createEntityManager();
List result = em.createQuery("SELECT COUNT(p) FROM PObject p").getResultList();
for (Object r:result)
System.err.println(r);
}
protected String getPersistenceUnitName() {
return "ordering";
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice;
import javax.persistence.*;
public class TestXA extends SliceTestCase {
public void setUp() throws Exception {
super.setUp(PObject.class, Person.class, Address.class);
}
public void testEmptyCommit() {
EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
em.getTransaction().commit();
}
public void testEmptyRollback() {
EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
em.getTransaction().rollback();
}
public void testPersistIndependentObjects() {
EntityManager em = emf.createEntityManager();
int before = count(PObject.class);
int N = 2;
long start = System.currentTimeMillis();
em.getTransaction().begin();
for (int i=0; i<N; i++)
em.persist(new PObject(start++));
em.getTransaction().commit();
em.clear();
int after = count(PObject.class);
assertEquals(before+N, after);
}
protected String getPersistenceUnitName() {
return "XA";
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice.policy;
import java.util.List;
import org.apache.openjpa.slice.DistributionPolicy;
import org.apache.openjpa.slice.*;
public class EvenOddDistributionPolicy implements DistributionPolicy {
public String distribute(Object pc, List<String> slices, Object context) {
if (pc instanceof PObject) {
int v = ((PObject)pc).getValue();
return (v%2 == 0) ? "Even" : "Odd";
}
return null;
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice.policy;
import java.util.List;
import org.apache.openjpa.slice.DistributionPolicy;
import org.apache.openjpa.slice.PObject;
import org.apache.openjpa.slice.Person;
/**
* Exemplar {@link DistributionPolicy} that maintains closure and distributes
* based on attributes of the given instance.
*
* @author Pinaki Poddar
*
*/
public class UserDistributionPolicy implements DistributionPolicy {
/**
* Distribute the given instance.
* Assumes that two configured slices are named as <em>One</em> and
* <em>Two</em>.<br>
* The policy is only implemented for PObject and Person i.e. two of three
* known classes. No policy is implemented for Address because Address is
* persisted always by cascade and hence Slice should assign automatically
* the same slice as its owner Person.
*
*/
public String distribute(Object pc, List<String> slices, Object context) {
assertValidSlices(slices);
if (pc instanceof PObject)
return distribute((PObject)pc);
if (pc instanceof Person) {
return distribute((Person)pc);
}
throw new RuntimeException("No policy for " + pc.getClass());
}
void assertValidSlices(List<String> slices) {
if (slices.contains("One") && slices.contains("Two"))
return;
throw new RuntimeException("This policy assumes two slices named " +
"One and Two. But configured slices are " + slices);
}
/**
* Distribute PObject based on odd-even value of its id.
*/
String distribute(PObject pc) {
return (pc.getId()%2 == 0) ? "One" : "Two";
}
/**
* Distribute Person based on first character of its name.
*/
String distribute(Person pc) {
return (pc.getName().startsWith("A")) ? "One" : "Two";
}
}

View File

@ -0,0 +1,118 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<persistence xmlns="http://java.sun.com/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" version="1.0" xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_1_0.xsd">
<persistence-unit name="enhance">
<class>org.apache.openjpa.slice.PObject</class>
<class>org.apache.openjpa.slice.Person</class>
<class>org.apache.openjpa.slice.Address</class>
</persistence-unit>
<!-- ==================================================================== -->
<!-- Configuration for testing basic per-slice configuration -->
<!-- ==================================================================== -->
<persistence-unit name="per-slice">
<properties>
<property name="openjpa.BrokerFactory" value="slice"/>
<property name="openjpa.slice.Names" value="One,Two, Three"/>
<property name="openjpa.slice.Master" value="One"/>
<property name="openjpa.slice.Lenient" value="true"/>
<property name="openjpa.slice.DistributionPolicy" value="org.apache.openjpa.slice.policy.UserDistributionPolicy"/>
<property name="openjpa.slice.ThreadingPolicy" value="fixed"/>
<property name="openjpa.Log" value="DefaultLevel=WARN, Enhance=TRACE, SQL=TRACE"/>
<property name="openjpa.ConnectionDriverName" value="com.mysql.jdbc.Driver"/>
<property name="openjpa.slice.One.ConnectionURL" value="jdbc:mysql://localhost/slice1"/>
<property name="openjpa.slice.Two.ConnectionURL" value="jdbc:mysql://localhost/slice2"/>
<property name="openjpa.slice.Three.ConnectionURL" value="jdbc:mysql://localhost/slice3"/>
</properties>
</persistence-unit>
<persistence-unit name="XA">
<class>org.apache.openjpa.slice.PObject</class>
<properties>
<property name="openjpa.BrokerFactory" value="slice"/>
<property name="openjpa.ConnectionDriverName" value="com.mysql.jdbc.Driver"/>
<property name="openjpa.slice.Names" value="One,Two"/>
<property name="openjpa.slice.DistributionPolicy" value="org.apache.openjpa.slice.policy.UserDistributionPolicy"/>
<property name="openjpa.slice.TransactionPolicy" value="xa"/>
<property name="openjpa.slice.One.ConnectionDriverName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"/>
<property name="openjpa.slice.One.ConnectionProperties" value="url=jdbc:mysql://localhost/slice1"/>
<property name="openjpa.slice.Two.ConnectionDriverName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"/>
<property name="openjpa.slice.Two.ConnectionProperties" value="url=jdbc:mysql://localhost/slice2"/>
<property name="openjpa.Multithreaded" value="false"/>
<property name="openjpa.Log" value="DefaultLevel=WARN, Enhance=TRACE, SQL=TRACE"/>
<property name="openjpa.jdbc.SynchronizeMappings" value="refresh"/>
<property name="openjpa.jdbc.MappingDefaults" value="DefaultMissingInfo=true"/>
<property name="openjpa.RuntimeUnenhancedClasses" value="supported"/>
</properties>
</persistence-unit>
<!-- ==================================================================== -->
<!-- Configuration for testing basic CRUD operations -->
<!-- ==================================================================== -->
<persistence-unit name="slice">
<class>org.apache.openjpa.slice.PObject</class>
<class>org.apache.openjpa.slice.Person</class>
<class>org.apache.openjpa.slice.Address</class>
<properties>
<property name="openjpa.BrokerFactory" value="slice"/>
<property name="openjpa.ConnectionDriverName" value="com.mysql.jdbc.Driver"/>
<property name="openjpa.slice.Names" value="One,Two"/>
<property name="openjpa.slice.Master" value="Two"/>
<property name="openjpa.slice.One.ConnectionURL" value="jdbc:mysql://localhost/slice1"/>
<property name="openjpa.slice.Two.ConnectionURL" value="jdbc:mysql://localhost/slice2"/>
<property name="openjpa.ConnectionUserName" value="root"/>
<property name="openjpa.ConnectionPassword" value="hello"/>
<property name="openjpa.slice.DistributionPolicy" value="org.apache.openjpa.slice.policy.UserDistributionPolicy"/>
<property name="openjpa.slice.Lenient" value="false"/>
<property name="openjpa.Connection2URL" value="jdbc:mysql://localhost/slice1"/>
<property name="openjpa.jdbc.SynchronizeMappings" value="buildSchema"/>
<property name="openjpa.QueryCompilationCache" value="false"/>
<property name="openjpa.Log" value="DefaultLevel=WARN, Enhance=TRACE, SQL=TRACE"/>
<property name="openjpa.jdbc.MappingDefaults" value="DefaultMissingInfo=true"/>
<property name="openjpa.RuntimeUnenhancedClasses" value="supported"/>
</properties>
</persistence-unit>
<persistence-unit name="ordering">
<class>org.apache.openjpa.slice.PObject</class>
<properties>
<property name="openjpa.BrokerFactory" value="slice"/>
<property name="openjpa.ConnectionDriverName" value="com.mysql.jdbc.Driver"/>
<property name="openjpa.slice.Even.ConnectionURL" value="jdbc:mysql://localhost/slice1"/>
<property name="openjpa.slice.Odd.ConnectionURL" value="jdbc:mysql://localhost/slice2"/>
<property name="openjpa.ConnectionUserName" value="root"/>
<property name="openjpa.ConnectionPassword" value="hello"/>
<property name="openjpa.slice.DistributionPolicy" value="org.apache.openjpa.slice.policy.EvenOddDistributionPolicy"/>
<property name="openjpa.slice.Lenient" value="false"/>
<property name="openjpa.Connection2URL" value="jdbc:mysql://localhost/slice1"/>
<property name="openjpa.jdbc.SynchronizeMappings" value="buildSchema"/>
<property name="openjpa.Log" value="DefaultLevel=WARN, Enhance=TRACE, SQL=TRACE"/>
<property name="openjpa.jdbc.MappingDefaults" value="DefaultMissingInfo=true"/>
</properties>
</persistence-unit>
</persistence>