Adding new (source code + pom.xml + doc) for Slice

git-svn-id: https://svn.apache.org/repos/asf/openjpa/trunk@619145 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Pinaki Poddar 2008-02-06 20:26:14 +00:00
parent 2a3ee08d92
commit 12839b6bd9
38 changed files with 5387 additions and 4 deletions

View File

@ -46,7 +46,8 @@
<!ENTITY ref_guide_deploy.xml SYSTEM "ref_guide_deploy.xml">
<!ENTITY ref_guide_runtime.xml SYSTEM "ref_guide_runtime.xml">
<!ENTITY ref_guide_caching.xml SYSTEM "ref_guide_caching.xml">
<!ENTITY ref_guide_remote.xml SYSTEM "ref_guide_remote.xml">
<!ENTITY ref_guide_remote.xml SYSTEM "ref_guide_remote.xml">
<!ENTITY ref_guide_slice.xml SYSTEM "ref_guide_slice.xml">
<!ENTITY ref_guide_integration.xml SYSTEM "ref_guide_integration.xml">
<!ENTITY ref_guide_optimization.xml SYSTEM "ref_guide_optimization.xml">
<!ENTITY samples_guide.xml SYSTEM "samples_guide.xml">
@ -100,7 +101,8 @@
&ref_guide_deploy.xml;
&ref_guide_runtime.xml;
&ref_guide_caching.xml;
&ref_guide_remote.xml;
&ref_guide_remote.xml;
&ref_guide_slice.xml;
&ref_guide_integration.xml;
&ref_guide_optimization.xml;
</part>

View File

@ -0,0 +1,498 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<chapter id="ref_guide_slice">
<title>
Distributed Persistence
</title>
<para>
The standard JPA runtime environment works with a <emphasis>single</emphasis>
database instance. OpenJPA can be extended via plug-in to work with
multiple databases within the same transaction without any change to the
existing application. This capability of OpenJPA for distributed
database environment is called <emphasis>Slice</emphasis> and is explained in
the following sections.
</para>
<section id="slice_overview">
<title>Overview</title>
<para>
Enterprise applications are increasingly deployed for distributed database
environments. The reasons for distributed, often horizontally-partitioned
database environment can be to counter massive data growth, to
support multiple external clients on a hosted platform or many other
practical scenarios that can benefit from data partitioning.
</para>
<para>
Any JPA-based user application has to address serious technical and conceptual
challenges to directly interact with a set of physical databases
within a single transaction.
Slice encapsulates the complexity of distributed database environment
via the abstraction of <emphasis>virtual</emphasis> database which internally
manages multiple physical databases. We refer each physical database instance
as <emphasis>slice</emphasis>.
<emphasis>Virtualization</emphasis> of distributed databases
makes OpenJPA object management kernel and
the user application to work in the same way as in the case of a single physical
database.
</para>
</section>
<section id="Features and Limitations">
<title>Salient Features</title>
<section><title>Transparency</title>
<para>
The existing application or the persistent domain model requires
<emphasis>no change</emphasis> to upgrade from a single database
to a distributed database environment.
</para>
</section>
<section><title>Custom Distribution Policy</title>
<para>
User application decides how the newly persistent instances be
distributed across the database slices. The data
distribution policy across the slices may be based on the attribute
of the data itself. For example, all Customer whose first name begins with
character 'A' to 'M' will be stored in one slice while names
beginning with 'N' to 'Z' will be stored in another slice.
<para>
This custom data distribution policy is specified by implementing
<classname>org.apache.openjpa.slice.DistributionPolicy</classname>
interface by the user application.
</para>
<para>
Slice tracks the original database for existing instances. When
an application issues a query, the resultant instances can be loaded
from different slices. This tracking is important as subsequent
update to any of these instances is committed to the appropriate
original database slice.
</para>
<para>
<warning>Currently, there is no provision for migrating an
existing instance from one slice to another.
</warning>
</para>
</section>
<section><title>Heterogeneous Database</title>
<para>
Each slice can be configured independently with its own JDBC
driver and other connection parameters. Hence the target database
environment can constitute of heterogeneous databases.
</para>
</section>
<section><title>Parallel Execution</title>
<para>
All database operations such as query, commit or flush operates
in parallel across the database slices. The execution threading
policy is configurable.
</para>
</section>
<section><title>Distributed Query</title>
<para>
The queries are executed across all slices and the results are
merged into a single list. The query result that includes
<code>ORDER BY</code> clause are sorted correctly by merging
results from each individual slice.
</para>
The queries that specify an aggregate projection such as
<code>COUNT()</code>, <code>MAX()</code>, <code>MIN()</code>
and <code>SUM()</code>
are correctly evaluated <emphasis>only if</emphasis> they
return a single result.
<para>
</para>
<para>
<warning>
The aggregate operation <code>AVG()</code> is not supported.
</warning>
</para>
</section>
<section><title>Distributed Transaction</title>
<para>
The database slices participate in a global transaction provided
each slice is configured with a XA-complaint JDBC driver, even
when the persistence unit is configured for <code>RESOURCE_LOCAL</code>
transaction.
</para>
<para>
<warning>
If any of the configured slices is not XA-complaint <emphasis>and</emphasis>
the persistence unit is configured for <code>RESOURCE_LOCAL</code>
transaction then each slice is committed without any two-phase
commit protocol. If commit on any slice fails, then atomic nature of
the transaction is not ensured.
</warning>
</para>
</section>
<section id="collocation_constraint"><title>Collocation Constraint</title>
<para>
No relationship can exist across database slices. In O-R mapping parlance,
this condition translates to the limitation that the closure of an object graph must be
<emphasis>collocated</emphasis> in the same database.
For example, consider a domain model where Person relates to Adress.
Person X refers to Address A while Person Y refers to Address B.
Collocation Constraint means that <emphasis>both</emphasis> X and A
must be stored in the same
database slice. Similarly Y and B must be stored in a single slice.
</para>
<para>
Slice, however, helps to maintain collocation constraint automatically.
The instances in the closure set of any newly persistent instance
reachable via cascaded relationship is stored in the same slice.
The user-defined distribution policy requires to supply the slice
for the root instance only.
</para>
</section>
</section>
<section id="slice_configuration">
<title>Usage</title>
<para>
Slice is activated via the following property settings:
</para>
<section>
<title>How to activate Slice Runtime?</title>
<para>
The basic configuration property is
<programlisting>
<![CDATA[ <property name="openjpa.BrokerFactory" value="slice"/>]]>
</programlisting>
This critical configuration activates a specialized factory class aliased
as <code>slice</code> to create object management kernel that
can work against multiple databases.
</para>
</section>
<section>
<title>How to configure each database slice?</title>
<para>
Each database slice is identified by a logical name unique within a
persistent unit. The list of the slices is specified by <code>slice.Names</code> property.
For example, specify three slices named <code>"One"</code>,
<code>"Two"</code> and <code>"Three"</code> as follows:
<programlisting>
<![CDATA[ <property name="slice.Names" value="One, Two, Three"/>]]>
</programlisting>
</para>
<para>
This property is not mandatory. If this property is not specified then
the configuration is scanned for logical slice names. Any property
of the form <code>slice.XYZ.abc</code> will register a slice with logical
name <code>"XYZ"</code>.
</para>
<para>
The order of the names can be significant if no <code>slice.Master</code>
property is specified.
</para>
<para>
Each database slice can be configured independently. For example, the
following configuration will register two slices with logical name
<code>One</code> and <code>Two</code>.
<programlisting>
<![CDATA[<property name="slice.One.ConnectionURL" value="jdbc:mysql:localhost//slice1"/>
<property name="slice.Two.ConnectionURL" value="jdbc:mysql:localhost//slice2"/>]]>
</programlisting>
</para>
<para>
Any OpenJPA specific property can be configured per slice basis.
For example, the following configuration will use two different JDBC
drivers for slice <code>One</code> and <code>Two</code>.
<programlisting>
<![CDATA[<property name="slice.One.ConnectionDriverName" value="com.mysql.jdbc.Driver"/>
<property name="slice.Two.ConnectionDriverName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"/>]]>
</programlisting>
</para>
<para>
Any property if unspecified for a particular slice will be defaulted by
corresponding OpenJPA property. For example, consider following three slices
<programlisting>
<![CDATA[<property name="slice.One.ConnectionURL" value="jdbc:mysql:localhost//slice1"/>
<property name="slice.Two.ConnectionURL" value="jdbc:mysql:localhost//slice2"/>
<property name="slice.Three.ConnectionURL" value="jdbc:oracle:localhost//slice3"/>
<property name="openjpa.ConnectionDriverName" value="com.mysql.jdbc.Driver"/>
<property name="slice.Three.ConnectionDriverName" value="oracle.jdbc.Driver"/>]]>
</programlisting>
In this example, <code>Three</code> will use slice-specific
<code>oracle.jdbc.Driver</code> driver while slice
<code>One</code> and <code>Two</code> will use
the driver <code>com.mysql.jdbc.Driver</code> as
specified by <code>openjpa.ConnectionDriverName</code>
property value.
</para>
</section>
<section id="distribution_policy">
<title>Implement DistributionPolicy interface</title>
<para>
Slice needs to determine which slice will persist a new instance.
The application can only decide this policy (for example,
all PurchaseOrders before April 30 goes to slice <code>One</code>,
all the rest goes to slice <code>Two</code>). This is why
the application has to implement
<code>org.apache.openjpa.slice.DistributionPolicy</code> and
specify the implementation class in configuration
<programlisting>
<![CDATA[ <property name="slice.DistributionPolicy" value="com.acme.foo.MyOptimialDistributionPolicy"/>]]>
</programlisting>
</para>
<para>
The interface <code>org.apache.openjpa.slice.DistributionPolicy</code>
is simple with a single method. The complete listing of the
documented interface follows:
<programlisting>
<![CDATA[
public interface DistributionPolicy {
/**
* Gets the name of the slice where a given instance will be stored.
*
* @param pc The newly persistent or to-be-merged object.
* @param slices name of the configured slices.
* @param context persistence context managing the given instance.
*
* @return identifier of the slice. This name must match one of the
* configured slice names.
* @see DistributedConfiguration#getSliceNames()
*/
String distribute(Object pc, List<String> slices, Object context);
}
]]>
</programlisting>
</para>
<para>
While implementing a distribution policy the most important thing to
remember is <link linkend="collocation_constraint">collocation constraint</link>.
Because Slice can not establish or query any cross-database relationship, all the
related instances must be stored in the same database slice.
Slice can determine the closure of a root object by traversal of
cascaded relationships. Hence user-defined policy has to only decide the
database for the root instance that is the explicit argument to
<code>EntityManager.persist()</code> call.
Slice will ensure that all other related instances that gets persisted by cascade
is assigned to the same database slice as that of the root instance.
However, the user-defined distribution policy must return the
same slice identifier for the instances that are logically related but
not cascaded for persist.
</para>
</section>
<section>
</section>
</section>
<title>Configuration Properties</title>
<para>
The properties to configure Slice can be classified in two broad groups.
The <emphasis>global</emphasis> properties apply to all the slices, for example,
the thread pool used to execute the queries in parallel or the transaction
manager used to coordinate transaction across multiple slices.
The <emphasis>per-slice</emphasis> properties apply to individual slice, for example,
the JDBC connection URL of a slice.
</para>
<section>
<title>Global Properties</title>
<section>
<title>slice.DistributionPolicy</title>
<para>
This <emphasis>mandatory</emphasis> plug-in property determines how newly
persistent instances are distributed across individual slices.
The value of this property is a fully-qualified class name that implements
<ulink url="../javadoc/org/apache/openjpa/slice/DistributionPolicy.html">
<classname>org.apache.openjpa.slice.DistributionPolicy</classname>
</ulink> interface.
</para>
</section>
<section><title>slice.Lenient</title>
<para>
This boolean plug-in property controls the behavior when one or more slice
can not be connected or unavailable for some other reasons.
If <code>true</code>, the unreachable slices are ignored. If
<code>false</code> then any unreachable slice will raise an exception
during startup.
</para>
<para>
By default this value is set to <code>false</code> i.e. all configured
slices must be available.
</para>
</section>
<section>
<title>slice.Master</title>
<para>
This plug-in property can be used to identify the name of the master slice.
Master slice is used when a primary key is to be generated from a database sequence.
</para>
<para>
By default the master slice is the first slice in the list of configured slice names.
</para>
<para>
<warning>
Currently, there is no provision to use sequence from
multiple database slices.
</warning>
</para>
</section>
<section>
<title>slice.Names</title>
<para>
This plug-in property can be used to register the logical slice names.
The value of this property is comma-separated list of slice names.
The ordering of the names in this list is
<emphasis>significant</emphasis> because
<link linkend="distribution_policy">DistributionPolicy</link> receives
the input argument of the slice names in the same order.
</para>
<para>
If logical slice names are not registered explicitly via this property,
then all logical slice names available in the persistence unit are
registered. The ordering of the slice names in this case is alphabetical.
</para>
<para>
If logical slice names are registered explicitly via this property, then
any logical slice that is available in the persistence unit but excluded
from this list is ignored.
</para>
</section>
<section>
<title>slice.ThreadingPolicy</title>
<para>
This plug-in property determines the nature of thread pool being used
for database operations such as query or flush on individual slices.
The value of the property is a
fully-qualified class name that implements
<ulink url="http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ExecutorService.html">
<classname>java.util.concurrent.ExecutorService</classname>
</ulink> interface.
Two pre-defined pools can be chosen via their aliases namely
<code>fixed</code> or <code>cached</code>.
</para>
<para>
The pre-defined alias <code>cached</code> activates a
<ulink url="http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool()">cached thread pool</ulink>.
A cached thread pool creates new threads as needed, but will reuse
previously constructed threads when they are available. This pool
is suitable in scenarios that execute many short-lived asynchronous tasks.
The way Slice uses the thread pool to execute database operations is
akin to such scenario and hence <code>cached</code> is the default
value for this plug-in property.
</para>
<para>
The <code>fixed</code> alias activates a
<ulink url="http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool(int)">fixed thread pool</ulink>.
The fixed thread pool can be further parameterized with
<code>CorePoolSize</code>, <code>MaximumPoolSize</code>,
<code>KeepAliveTime</code> and <code>RejectedExecutionHandler</code>.
The meaning of these parameters are described in
<ulink url="http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.html">JavaDoc</ulink>.
The users can exercise finer control on thread pool behavior via these
parameters.
By default, the core pool size is <code>10</code>, maximum pool size is
also <code>10</code>, keep alive time is <code>60</code> seconds and
rejected execution is
<ulink url="http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.AbortPolicy.html">aborted</ulink>.
</para>
<para>
Both of the pre-defined aliases can be parameterized with a fully-qualified
class name that implements
<ulink url="http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadFactory.html">
<classname>java.util.concurrent.ThreadFactory</classname>
</ulink> interface.
</para>
</section>
<section>
<title>slice.TransactionPolicy</title>
<para>
This plug-in property determines the policy for transaction commit
across multiple slices. The value of this property is a fully-qualified
class name that implements
<ulink url="http://java.sun.com/j2ee/sdk_1.3/techdocs/api/javax/transaction/TransactionManager.html">
<classname>javax.transaction.TransactionManager</classname>
</ulink> interface.
</para>
<para>
Three pre-defined policies can be chosen
by their aliases namely <code>default</code>,
<code>xa</code> and <code>jndi</code>.
</para>
<para>
The <code>default</code> policy employs
a Transaction Manager that commits or rolls back transaction on individual
slices <emphasis>without</emphasis> a two-phase commit protocol.
It does <emphasis>not</emphasis>
guarantee atomic nature of transaction across all the slices because if
one or more slice fails to commit, there is no way to rollback the transaction
on other slices that committed successfully.
</para>
<para>
The <code>xa</code> policy employs a Transaction Manager that that commits
or rolls back transaction on individual
slices using a two-phase commit protocol. The prerequisite to use this scheme
is, of course, that all the slices must be configured to use
XA-complaint JDBC driver.
</para>
<para>
The <code>jndi</code> policy employs a Transaction Manager by looking up the
JNDI context. The prerequisite to use this transaction
manager is, of course, that all the slices must be configured to use
XA-complaint JDBC driver.
<warning>This JNDI based policy is not available currently.</warning>
</para>
</section>
</section>
<section>
<title>Per-Slice Properties</title>
<para>
Any OpenJPA property can be configured for each individual slice. The property name
is of the form <code>slice.[Logical slice name].[OpenJPA Property Name]</code>.
For example, <code>slice.One.ConnectionURL</code> where <code>One</code>
is the logical slice name and <code>ConnectionURL</code> is a OpenJPA property
name.
</para>
<para>
If a property is not configured for a specific slice, then the value for
the property equals to the corresponding <code>openjpa.*</code> property.
</para>
</section>
</chapter>

61
openjpa-slice/pom.xml Normal file
View File

@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.openjpa</groupId>
<artifactId>openjpa-slice</artifactId>
<packaging>jar</packaging>
<name>OpenJPA Slice</name>
<description>OpenJPA Slice</description>
<url>http://openjpa.apache.org</url>
<parent>
<groupId>org.apache.openjpa</groupId>
<artifactId>openjpa-parent</artifactId>
<version>1.1.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.openjpa</groupId>
<artifactId>openjpa-kernel</artifactId>
<version>${pom.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.openjpa</groupId>
<artifactId>openjpa-jdbc</artifactId>
<version>${pom.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.5</source>
<target>1.5</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice;
import org.apache.openjpa.kernel.BrokerImpl;
import org.apache.openjpa.kernel.OpCallbacks;
import org.apache.openjpa.kernel.OpenJPAStateManager;
import org.apache.openjpa.lib.util.Localizer;
import org.apache.openjpa.util.UserException;
/**
* A specialized Broker to associate slice identifiers with the StateManagers as
* they are persisted in a cascade. This intervention helps the user to define
* distribution policy only for root instances i.e. the instances that are
* explicit argument to persist() call. The cascaded instances are assigned the
* same slice to honor collocation constraint.
*
* @author Pinaki Poddar
*
*/
@SuppressWarnings("serial")
public class DistributedBrokerImpl extends BrokerImpl {
private transient String slice;
private static final Localizer _loc =
Localizer.forPackage(DistributedBrokerImpl.class);
/**
* Assigns slice identifier to the resultant StateManager as initialized by
* the super class implementation. The slice identifier is decided by
* {@link DistributionPolicy} for given <code>pc</code> if it is a root
* instance i.e. the argument of the user application's persist() call. The
* cascaded instances are detected by non-empty status of the current
* operating set. The slice is assigned only if a StateManager has never
* been assigned before.
*/
public OpenJPAStateManager persist(Object pc, Object id, boolean explicit,
OpCallbacks call) {
OpenJPAStateManager sm = getStateManager(pc);
if (getOperatingSet().isEmpty()
&& (sm == null || sm.getImplData() == null)) {
slice = getSlice(pc);
}
sm = super.persist(pc, id, explicit, call);
if (sm.getImplData() == null)
sm.setImplData(slice, true);
return sm;
}
/**
* Gets the slice by the user-defined distribution policy.
*/
String getSlice(Object pc) {
DistributedConfiguration conf =
(DistributedConfiguration) getConfiguration();
String slice =
(conf.getDistributionPolicyInstance().distribute(pc, conf
.getActiveSliceNames(), this));
if (!conf.getActiveSliceNames().contains(slice))
throw new UserException(_loc.get("bad-policy-slice", new Object[] {
conf.getDistributionPolicyInstance().getClass().getName(),
slice, pc, conf.getActiveSliceNames() }));
return slice;
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice;
import java.util.List;
import org.apache.openjpa.conf.OpenJPAConfiguration;
/**
* A configuration for multiple data stores, each referred as <em>slice</em>.
* This configuration allows each underlying slice be configured with its
* own specific configuration properties such as JDBC Driver or connection
* user/password etc. <br>
* This configuration also extends by adding a {@link DistributionPolicy
* DistributionPolicy} that governs how new instances be distributed
* among the slices.
*
* @author Pinaki Poddar
*
*/
public interface DistributedConfiguration extends OpenJPAConfiguration {
/**
* Gets the active slice identifiers. This list is determined by the
* configuration properties either by explicit listing in
* <code>slice.Names</code> property or by scanning <code>slice.*.*</code>
* properties.
* <br>
* The ordering of the slice identifiers is determined when they are
* specified explicitly in <code>slice.Names</code> property or
* ordered alphabetically when found by scanning the properties.
* <br>
* This list always returns the identifiers that are <em>active</em>, slices
* that can not be connected to are not included in this list.
*/
List<String> getActiveSliceNames();
/**
* Gets the available slice identifiers irrespective of their status.
* @return
*/
List<String> getAvailableSliceNames();
/**
* Gets the slices of given status.
* @param statuses list of status flags. If null, returns all slices
* irrespective of status;
*/
List<Slice> getSlices(Slice.Status...statuses);
/**
* Gets the Slice for a given name.
* Exception is raised if the given slice is not configured.
*/
Slice getSlice(String sliceName);
/**
* Gets the policy that governs how new instances will be distributed across
* the available slices.
*/
DistributionPolicy getDistributionPolicyInstance();
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice;
import java.util.List;
/**
* Policy to select one of the physical databases referred as <em>slice</em>
* in which a given persistent instance will be stored.
*
* @author Pinaki Poddar
*
*/
public interface DistributionPolicy {
/**
* Gets the name of the slice where a given instance will be stored.
*
* @param pc The newly persistent or to-be-merged object.
* @param slices list of names of the configured slices. The ordering of
* the list is either explicit (by the <code>slice.Names</code> property)
* or implicit i.e. alphabetic if <code>slice.Names</code> is unspecified.
* @param context generic persistence context managing the given instance.
*
* @return identifier of the slice. This name must match one of the
* configured slice names.
* @see DistributedConfiguration#getActiveSliceNames()
*/
String distribute(Object pc, List<String> slices, Object context);
}

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.openjpa.event.RemoteCommitEventManager;
import org.apache.openjpa.event.RemoteCommitProvider;
import org.apache.openjpa.lib.conf.Configuration;
import org.apache.openjpa.lib.conf.Configurations;
import org.apache.openjpa.lib.conf.PluginValue;
import org.apache.openjpa.lib.util.Localizer;
import org.apache.openjpa.lib.util.Options;
import org.apache.openjpa.util.UserException;
/**
* Value type used to represent a {@link ExecutorService}.
* This value controls the thread pool parameters. The thread pool is used
* to execute the queries.
*
* @author Pinaki Poddar
* @nojavadoc
*/
public class ExecutorServiceValue extends PluginValue {
private static List<String> known =
Arrays.asList(new String[] { "cached", "fixed" });
private static Localizer _loc =
Localizer.forPackage(ExecutorServiceValue.class);
public ExecutorServiceValue() {
super("ThreadingPolicy", true);
setDefault("cached");
}
public void setProperties(String props) {
super.setProperties(props);
}
/**
* Configures a cached or fixed thread pool.
*/
@Override
public Object instantiate(Class type, Configuration conf, boolean fatal) {
Object obj = null;
int defaultSize = 10;
String cls = getClassName();
if (!known.contains(cls))
cls = "cached";
Options opts = Configurations.parseProperties(getProperties());
ThreadFactory factory = null;
if (opts.containsKey("ThreadFactory")) {
String fName = opts.getProperty("ThreadFactory");
try {
factory = (ThreadFactory) Class.forName(fName).newInstance();
Configurations.configureInstance(factory, conf, opts,
getProperty());
} catch (Throwable t) {
throw new UserException(_loc.get("bad-thread-factory", fName), t);
} finally {
opts.removeProperty("ThreadFactory");
}
} else {
factory = Executors.defaultThreadFactory();
}
if ("cached".equals(cls)) {
obj = Executors.newCachedThreadPool(factory);
} else if ("fixed".equals(cls)) {
long keepAliveTime = 60L;
if (opts.containsKey("KeepAliveTime")) {
keepAliveTime = opts.getLongProperty("KeepAliveTime");
opts.removeLongProperty("KeepAliveTime");
}
obj = new ThreadPoolExecutor(defaultSize, defaultSize,
keepAliveTime, TimeUnit.SECONDS,
new PriorityBlockingQueue<Runnable>(), factory);
Configurations.configureInstance(obj, conf, opts, getProperty());
}
set(obj, true);
return obj;
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice;
import java.util.Map;
import org.apache.openjpa.conf.OpenJPAProductDerivation;
import org.apache.openjpa.lib.conf.AbstractProductDerivation;
import org.apache.openjpa.slice.jdbc.DistributedJDBCBrokerFactory;
/**
* Derives configuration for Slice.
* Introduces a specialized BrokerFactory aliased as <code>slice</code>.
* All Slice specific configuration is prefixed as <code>slice.XXX</code>
*
* @author Pinaki Poddar
*
*/
public class ProductDerivation extends AbstractProductDerivation implements
OpenJPAProductDerivation {
@SuppressWarnings("unchecked")
public void putBrokerFactoryAliases(Map m) {
m.put("slice", DistributedJDBCBrokerFactory.class.getName());
}
public String getConfigurationPrefix() {
return "slice";
}
public int getType() {
return TYPE_FEATURE;
}
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice;
import org.apache.openjpa.conf.OpenJPAConfiguration;
/**
* Represents a database slice of immutable logical name.
*
* @author Pinaki Poddar
*
*/
public class Slice implements Comparable<Slice> {
public enum Status {
NOT_INITIALIZED,
ACTIVE,
INACTIVE, // configured but not available
EXCLUDED // configured but not used
};
private final String name;
private transient final OpenJPAConfiguration conf;
private transient Status status;
/**
* Supply the logical name and configuration.
*/
public Slice(String name, OpenJPAConfiguration conf) {
this.name = name;
this.conf = conf;
this.status = Status.NOT_INITIALIZED;
}
/**
* Gets the immutable logical name.
*/
public String getName() {
return name;
}
public OpenJPAConfiguration getConfiguration() {
return conf;
}
public Status getStatus() {
return status;
}
public void setStatus(Status status) {
this.status = status;
}
public boolean isActive() {
return status == Status.ACTIVE;
}
public String toString() {
return name;
}
public int compareTo(Slice other) {
return name.compareTo(other.name);
}
/**
* Equals by name.
*/
@Override
public boolean equals(Object other) {
if (this == other) return true;
if (other == null) return false;
if (other instanceof Slice) {
return name.equals(((Slice)other).getName());
}
return false;
}
@Override
public int hashCode() {
return name.hashCode();
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice;
import org.apache.openjpa.enhance.PersistenceCapable;
import org.apache.openjpa.kernel.StateManagerImpl;
import org.apache.openjpa.util.ImplHelper;
/**
* A helper to determine the slice identifier of an instance.
*
* @author Pinaki Poddar
*
*/
public class SlicePersistence {
/**
* Get the slice identifier for the given instance if it is a managed
* instance and has been assigned to a slice.
*
* @return name of the slice, if any. null otherwise.
*/
public static String getSlice(Object obj) {
if (obj == null)
return null;
PersistenceCapable pc = ImplHelper.toPersistenceCapable(obj, null);
if (pc == null)
return null;
StateManagerImpl sm = (StateManagerImpl)pc.pcGetStateManager();
if (sm == null)
return null;
Object slice = sm.getImplData();
return (slice instanceof String) ? (String)slice : null;
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice;
import java.io.InputStream;
import java.util.Properties;
public class SliceVersion {
public static final String VERSION;
public static final String REVISION;
static {
Properties revisionProps = new Properties();
try {
InputStream in = SliceVersion.class.getResourceAsStream
("/META-INF/org.apache.openjpa.slice.revision.properties");
if (in != null) {
try {
revisionProps.load(in);
} finally {
in.close();
}
}
} catch (Exception e) {
}
VERSION = revisionProps.getProperty("slice.version", "0.0.0");
REVISION = revisionProps.getProperty("revision.number");
}
public static void main(String[] args) {
System.out.println(new SliceVersion());
}
public String toString() {
return "Slice Version " + VERSION + " [revision "+REVISION+"]";
}
}

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice.jdbc;
import org.apache.openjpa.kernel.StoreQuery;
import org.apache.openjpa.kernel.exps.QueryExpressions;
import org.apache.openjpa.kernel.exps.Value;
import org.apache.openjpa.lib.rop.ResultObjectProvider;
import org.apache.openjpa.util.InternalException;
public class AggregateResultObjectProvider implements ResultObjectProvider {
private final ResultObjectProvider[] _rops;
private final StoreQuery _query;
private final QueryExpressions[] _exps;
private Object _single;
private boolean _opened;
public AggregateResultObjectProvider(ResultObjectProvider[] rops,
StoreQuery q, QueryExpressions[] exps) {
_rops = rops;
_query = q;
_exps = exps;
}
public boolean absolute(int pos) throws Exception {
return false;
}
public void close() throws Exception {
_opened = false;
for (ResultObjectProvider rop:_rops)
rop.close();
}
public Object getResultObject() throws Exception {
if (!_opened)
throw new InternalException(this + " not-open");
return _single;
}
public void handleCheckedException(Exception e) {
_rops[0].handleCheckedException(e);
}
public boolean next() throws Exception {
if (!_opened) {
open();
}
if (_single != null)
return false;
Value[] values = _exps[0].projections;
Object[] single = new Object[values.length];
for (int i=0; i<values.length; i++) {
Value v = values[i];
boolean isAggregate = v.isAggregate();
int op = decideOperationType(v);
for (ResultObjectProvider rop:_rops) {
rop.next();
Object[] row = (Object[]) rop.getResultObject();
switch (op) {
case 2: single[i] = count(single[i],row[i]);
break;
default : single[i] = row[i];
}
}
}
_single = single;
return true;
}
int decideOperationType(Value v) {
String cls = v.getClass().getName();
if (cls.equals("org.apache.openjpa.jdbc.kernel.exps.Sum"))
return 1;
if (cls.equals("org.apache.openjpa.jdbc.kernel.exps.Count"))
return 2;
return 0;
}
long count(Object current, Object other) {
if (current == null)
return (Long) other;
return (Long)current + (Long)other;
}
public void open() throws Exception {
for (ResultObjectProvider rop:_rops)
rop.open();
_opened = true;
}
public void reset() throws Exception {
}
public int size() throws Exception {
return 1;
}
public boolean supportsRandomAccess() {
return false;
}
}

View File

@ -0,0 +1,263 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice.jdbc;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Savepoint;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
/**
* A virtual connection that contains multiple physical connections.
*
* @author Pinaki Poddar
*
*/
class DistributedConnection implements Connection {
private final List<Connection> real;
private final Connection master;
public DistributedConnection(List<Connection> connections) {
if (connections == null || connections.isEmpty())
throw new NullPointerException();
real = connections;
master = connections.get(0);
}
public boolean contains(Connection c) {
return real.contains(c);
}
public void clearWarnings() throws SQLException {
for (Connection c : real)
c.clearWarnings();
}
public void close() throws SQLException {
for (Connection c : real)
c.close();
}
public void commit() throws SQLException {
for (Connection c : real)
c.commit();
}
public Statement createStatement() throws SQLException {
DistributedStatement ret = new DistributedStatement(this);
for (Connection c : real) {
ret.add(c.createStatement());
}
return ret;
}
public Statement createStatement(int arg0, int arg1) throws SQLException {
DistributedStatement ret = new DistributedStatement(this);
for (Connection c : real) {
ret.add(c.createStatement(arg0, arg1));
}
return ret;
}
public Statement createStatement(int arg0, int arg1, int arg2)
throws SQLException {
DistributedStatement ret = new DistributedStatement(this);
for (Connection c : real) {
ret.add(c.createStatement(arg0, arg1, arg2));
}
return ret;
}
public boolean getAutoCommit() throws SQLException {
return master.getAutoCommit();
}
public String getCatalog() throws SQLException {
return master.getCatalog();
}
public int getHoldability() throws SQLException {
return master.getHoldability();
}
public DatabaseMetaData getMetaData() throws SQLException {
return master.getMetaData();
}
public int getTransactionIsolation() throws SQLException {
return master.getTransactionIsolation();
}
public Map<String, Class<?>> getTypeMap() throws SQLException {
return master.getTypeMap();
}
public SQLWarning getWarnings() throws SQLException {
return master.getWarnings();
}
public boolean isClosed() throws SQLException {
boolean ret = false;
for (Connection c : real) {
ret &= c.isClosed();
}
return ret;
}
public boolean isReadOnly() throws SQLException {
boolean ret = false;
for (Connection c : real) {
ret &= c.isReadOnly();
}
return ret;
}
public String nativeSQL(String arg0) throws SQLException {
return master.nativeSQL(arg0);
}
public CallableStatement prepareCall(String arg0) throws SQLException {
throw new UnsupportedOperationException();
}
public CallableStatement prepareCall(String arg0, int arg1, int arg2)
throws SQLException {
throw new UnsupportedOperationException();
}
public CallableStatement prepareCall(String arg0, int arg1, int arg2,
int arg3) throws SQLException {
throw new UnsupportedOperationException();
}
public PreparedStatement prepareStatement(String arg0) throws SQLException {
// TODO: Big hack
if (arg0.startsWith("SELECT SEQUENCE_VALUE FROM OPENJPA_SEQUENCE_TABLE"))
return master.prepareStatement(arg0);
DistributedPreparedStatement ret = new DistributedPreparedStatement(this);
for (Connection c : real) {
ret.add(c.prepareStatement(arg0));
}
return ret;
}
public PreparedStatement prepareStatement(String arg0, int arg1)
throws SQLException {
DistributedPreparedStatement ret = new DistributedPreparedStatement(this);
for (Connection c : real) {
ret.add(c.prepareStatement(arg0, arg1));
}
return ret;
}
public PreparedStatement prepareStatement(String arg0, int[] arg1)
throws SQLException {
DistributedPreparedStatement ret = new DistributedPreparedStatement(this);
for (Connection c : real) {
ret.add(c.prepareStatement(arg0, arg1));
}
return ret;
}
public PreparedStatement prepareStatement(String arg0, String[] arg1)
throws SQLException {
DistributedPreparedStatement ret = new DistributedPreparedStatement(this);
for (Connection c : real) {
ret.add(c.prepareStatement(arg0, arg1));
}
return ret;
}
public PreparedStatement prepareStatement(String arg0, int arg1, int arg2)
throws SQLException {
DistributedPreparedStatement ret = new DistributedPreparedStatement(this);
for (Connection c : real) {
ret.add(c.prepareStatement(arg0, arg1, arg2));
}
return ret;
}
public PreparedStatement prepareStatement(String arg0, int arg1, int arg2,
int arg3) throws SQLException {
DistributedPreparedStatement ret = new DistributedPreparedStatement(this);
for (Connection c : real) {
ret.add(c.prepareStatement(arg0, arg1, arg2));
}
return ret;
}
public void releaseSavepoint(Savepoint arg0) throws SQLException {
for (Connection c : real)
c.releaseSavepoint(arg0);
}
public void rollback() throws SQLException {
for (Connection c : real)
c.rollback();
}
public void rollback(Savepoint arg0) throws SQLException {
for (Connection c : real)
c.rollback(arg0);
}
public void setAutoCommit(boolean arg0) throws SQLException {
for (Connection c : real)
c.setAutoCommit(arg0);
}
public void setCatalog(String arg0) throws SQLException {
for (Connection c : real)
c.setCatalog(arg0);
}
public void setHoldability(int arg0) throws SQLException {
for (Connection c : real)
c.setHoldability(arg0);
}
public void setReadOnly(boolean arg0) throws SQLException {
for (Connection c : real)
c.setReadOnly(arg0);
}
public Savepoint setSavepoint() throws SQLException {
throw new UnsupportedOperationException();
}
public Savepoint setSavepoint(String arg0) throws SQLException {
throw new UnsupportedOperationException();
}
public void setTransactionIsolation(int arg0) throws SQLException {
for (Connection c : real)
c.setTransactionIsolation(arg0);
}
public void setTypeMap(Map<String, Class<?>> arg0) throws SQLException {
for (Connection c : real)
c.setTypeMap(arg0);
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice.jdbc;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.sql.DataSource;
import javax.sql.XADataSource;
import org.apache.openjpa.lib.jdbc.DecoratingDataSource;
/**
* A virtual datasource that contains many physical datasources.
*
* @author Pinaki Poddar
*
*/
class DistributedDataSource extends DecoratingDataSource implements
Iterable<DataSource> {
private List<DataSource> real = new ArrayList<DataSource>();
private DataSource master;
public DistributedDataSource(List<DataSource> dataSources) {
super(dataSources.get(0));
real = dataSources;
master = dataSources.get(0);
}
Connection getConnection(DataSource ds) throws SQLException {
if (ds instanceof DecoratingDataSource)
return getConnection(((DecoratingDataSource)ds).getInnermostDelegate());
if (ds instanceof XADataSource)
return ((XADataSource)ds).getXAConnection().getConnection();
return ds.getConnection();
}
Connection getConnection(DataSource ds, String user, String pwd) throws SQLException {
if (ds instanceof DecoratingDataSource)
return getConnection(((DecoratingDataSource)ds).getInnermostDelegate(), user, pwd);
if (ds instanceof XADataSource)
return ((XADataSource)ds).getXAConnection(user, pwd).getConnection();
return ds.getConnection(user, pwd);
}
public Iterator<DataSource> iterator() {
return real.iterator();
}
public Connection getConnection() throws SQLException {
List<Connection> c = new ArrayList<Connection>();
for (DataSource ds : real)
c.add(ds.getConnection());
return new DistributedConnection(c);
}
public Connection getConnection(String username, String password)
throws SQLException {
List<Connection> c = new ArrayList<Connection>();
for (DataSource ds : real)
c.add(ds.getConnection(username, password));
return new DistributedConnection(c);
}
public PrintWriter getLogWriter() throws SQLException {
return master.getLogWriter();
}
public int getLoginTimeout() throws SQLException {
return master.getLoginTimeout();
}
public void setLogWriter(PrintWriter out) throws SQLException {
for (DataSource ds:real)
ds.setLogWriter(out);
}
public void setLoginTimeout(int seconds) throws SQLException {
for (DataSource ds:real)
ds.setLoginTimeout(seconds);
}
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice.jdbc;
import java.util.Map;
import org.apache.openjpa.conf.OpenJPAVersion;
import org.apache.openjpa.jdbc.conf.JDBCConfiguration;
import org.apache.openjpa.jdbc.kernel.JDBCBrokerFactory;
import org.apache.openjpa.kernel.Bootstrap;
import org.apache.openjpa.kernel.StoreManager;
import org.apache.openjpa.lib.conf.ConfigurationProvider;
import org.apache.openjpa.lib.util.Localizer;
import org.apache.openjpa.slice.SliceVersion;
/**
* A factory for distributed JDBC datastores.
*
* @author Pinaki Poddar
*
*/
@SuppressWarnings("serial")
public class DistributedJDBCBrokerFactory extends JDBCBrokerFactory {
private static final Localizer _loc =
Localizer.forPackage(DistributedJDBCBrokerFactory.class);
/**
* Factory method for constructing a factory from properties. Invoked from
* {@link Bootstrap#newBrokerFactory}.
*/
public static DistributedJDBCBrokerFactory newInstance(
ConfigurationProvider cp) {
DistributedJDBCConfigurationImpl conf =
new DistributedJDBCConfigurationImpl(cp);
cp.setInto(conf);
return new DistributedJDBCBrokerFactory(conf);
}
/**
* Factory method for obtaining a possibly-pooled factory from properties.
* Invoked from {@link Bootstrap#getBrokerFactory}.
*/
public static JDBCBrokerFactory getInstance(ConfigurationProvider cp) {
Map properties = cp.getProperties();
Object key = toPoolKey(properties);
DistributedJDBCBrokerFactory factory =
(DistributedJDBCBrokerFactory) getPooledFactoryForKey(key);
if (factory != null)
return factory;
factory = newInstance(cp);
pool(key, factory);
return factory;
}
/**
* Factory method for constructing a factory from a configuration.
*/
public static synchronized JDBCBrokerFactory getInstance(
JDBCConfiguration conf) {
Map properties = conf.toProperties(false);
Object key = toPoolKey(properties);
DistributedJDBCBrokerFactory factory =
(DistributedJDBCBrokerFactory) getPooledFactoryForKey(key);
if (factory != null)
return factory;
factory = new DistributedJDBCBrokerFactory(
(DistributedJDBCConfiguration) conf);
pool(key, factory);
return factory;
}
public DistributedJDBCBrokerFactory(DistributedJDBCConfiguration conf) {
super(conf);
}
@Override
public DistributedJDBCConfiguration getConfiguration() {
return (DistributedJDBCConfiguration)super.getConfiguration();
}
@Override
protected StoreManager newStoreManager() {
return new DistributedStoreManager(getConfiguration());
}
@Override
protected Object getFactoryInitializationBanner() {
return _loc.get("factory-init", new SliceVersion());
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice.jdbc;
import java.util.concurrent.ExecutorService;
import javax.transaction.TransactionManager;
import org.apache.openjpa.jdbc.conf.JDBCConfiguration;
import org.apache.openjpa.slice.DistributedConfiguration;
import org.apache.openjpa.slice.Slice;
/**
* A distributed configuration that is a ordered collection of
* JDBCConfigurations.
*
* @author Pinaki Poddar
*
*/
public interface DistributedJDBCConfiguration extends JDBCConfiguration,
DistributedConfiguration {
/**
* Gets the master slice.
*/
Slice getMaster();
/**
* Gets the TransactionManager instance being used.
*/
TransactionManager getTransactionManagerInstance();
/**
* Gets the alias for TransactionManager being used.
*/
String getTransactionManager();
/**
* Gets the alias for ExecutorService being used.
*/
String getExecutorService();
/**
* Gets the ExecutorService being used.
*/
ExecutorService getExecutorServiceInstance();
}

View File

@ -0,0 +1,480 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice.jdbc;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import javax.sql.DataSource;
import javax.sql.XADataSource;
import javax.transaction.TransactionManager;
import org.apache.openjpa.conf.OpenJPAConfiguration;
import org.apache.openjpa.jdbc.conf.JDBCConfiguration;
import org.apache.openjpa.jdbc.conf.JDBCConfigurationImpl;
import org.apache.openjpa.jdbc.schema.DataSourceFactory;
import org.apache.openjpa.lib.conf.BooleanValue;
import org.apache.openjpa.lib.conf.ConfigurationProvider;
import org.apache.openjpa.lib.conf.PluginValue;
import org.apache.openjpa.lib.conf.StringListValue;
import org.apache.openjpa.lib.conf.StringValue;
import org.apache.openjpa.lib.jdbc.DecoratingDataSource;
import org.apache.openjpa.lib.jdbc.DelegatingDataSource;
import org.apache.openjpa.lib.log.Log;
import org.apache.openjpa.lib.log.LogFactory;
import org.apache.openjpa.lib.log.LogFactoryImpl;
import org.apache.openjpa.lib.util.Localizer;
import org.apache.openjpa.slice.DistributedBrokerImpl;
import org.apache.openjpa.slice.DistributionPolicy;
import org.apache.openjpa.slice.ExecutorServiceValue;
import org.apache.openjpa.slice.Slice;
import org.apache.openjpa.slice.SliceVersion;
import org.apache.openjpa.util.UserException;
/**
* Implements a distributed configuration of JDBCStoreManagers.
*
* @author Pinaki Poddar
*
*/
public class DistributedJDBCConfigurationImpl extends JDBCConfigurationImpl
implements DistributedJDBCConfiguration {
private final List<Slice> _slices = new ArrayList<Slice>();
private List<String> _activeSliceNames = new ArrayList<String>();
private Slice _master;
private DecoratingDataSource virtualDataSource;
protected BooleanValue lenientPlugin;
protected StringValue masterPlugin;
protected StringListValue namesPlugin;
protected PluginValue txnMgrPlugin;
protected ExecutorServiceValue executorServicePlugin;
protected PluginValue distributionPolicyPlugin;
private static Localizer _loc =
Localizer.forPackage(DistributedJDBCConfigurationImpl.class);
public static final String PREFIX_SLICE = "slice.";
public static final String PREFIX_OPENJPA = "openjpa.";
/**
* Configure itself as well as underlying slices.
*
*/
public DistributedJDBCConfigurationImpl(ConfigurationProvider cp) {
super(true, false);
Map p = cp.getProperties();
String pUnit = getPersistenceUnitName(p);
setDiagnosticContext(pUnit);
Log log = getConfigurationLog();
log.info(_loc.get("config-init", SliceVersion.VERSION));
brokerPlugin.setString(DistributedBrokerImpl.class.getName());
distributionPolicyPlugin = addPlugin("DistributionPolicy", true);
distributionPolicyPlugin.setDynamic(true);
lenientPlugin = addBoolean("Lenient");
masterPlugin = addString("Master");
namesPlugin = addStringList("Names");
txnMgrPlugin = addPlugin("TransactionPolicy", true);
txnMgrPlugin.setAlias("default",
"org.apache.openjpa.slice.transaction.NaiveTransactionManager");
txnMgrPlugin.setAlias("xa",
"org.apache.openjpa.slice.transaction.DistributedTransactionManager");
txnMgrPlugin.setAlias("jndi",
"org.apache.openjpa.slice.transaction.LookUpTransactionManager");
txnMgrPlugin.setDefault("default");
txnMgrPlugin.setString("default");
executorServicePlugin = new ExecutorServiceValue();
addValue(executorServicePlugin);
setSlices(p);
}
private String getPersistenceUnitName(Map p) {
Object unit = p.get(PREFIX_OPENJPA+id.getProperty());
return (unit == null) ? "?" : unit.toString();
}
private void setDiagnosticContext(String unit) {
LogFactory logFactory = getLogFactory();
if (logFactory instanceof LogFactoryImpl) {
((LogFactoryImpl)logFactory).setDiagnosticContext(unit);
}
}
/**
* Gets the name of the active slices.
*/
public List<String> getActiveSliceNames() {
if (_activeSliceNames.isEmpty()) {
for (Slice slice:_slices)
if (slice.isActive())
_activeSliceNames.add(slice.getName());
}
return _activeSliceNames;
}
/**
* Gets the name of the available slices.
*/
public List<String> getAvailableSliceNames() {
List<String> result = new ArrayList<String>();
for (Slice slice:_slices)
result.add(slice.getName());
return result;
}
/**
* Gets the slices of given status. Null returns all irrespective of status.
*/
public List<Slice> getSlices(Slice.Status...statuses) {
if (statuses == null)
return Collections.unmodifiableList(_slices);
List<Slice> result = new ArrayList<Slice>();
for (Slice slice:_slices) {
for (Slice.Status status:statuses)
if (slice.getStatus().equals(status))
result.add(slice);
}
return result;
}
/**
* Gets the master slice.
*/
public Slice getMaster() {
return _master;
}
/**
* Get the configuration for given slice.
*/
public Slice getSlice(String name) {
for (Slice slice:_slices)
if (slice.getName().equals(name))
return slice;
throw new UserException(_loc.get("slice-not-found", name,
getActiveSliceNames()));
}
public DistributionPolicy getDistributionPolicyInstance() {
if (distributionPolicyPlugin.get() == null) {
distributionPolicyPlugin.instantiate(DistributionPolicy.class,
this, true);
}
return (DistributionPolicy) distributionPolicyPlugin.get();
}
public void setDistributionPolicyInstance(String val) {
distributionPolicyPlugin.set(val);
}
public Object getConnectionFactory() {
if (virtualDataSource == null) {
DistributedDataSource ds = createDistributedDataStore();
virtualDataSource =
DataSourceFactory.installDBDictionary(
getDBDictionaryInstance(), ds, this, false);
}
return virtualDataSource;
}
/**
* Create a virtual DistributedDataSource as a composite of individual
* slices as per configuration, optionally ignoring slices that can not be
* connected.
*/
private DistributedDataSource createDistributedDataStore() {
List<DataSource> dataSources = new ArrayList<DataSource>();
boolean isLenient = lenientPlugin.get();
boolean isXA = true;
for (Slice slice : _slices) {
JDBCConfiguration conf = (JDBCConfiguration)slice.getConfiguration();
Log log = conf.getConfigurationLog();
String url = getConnectionInfo(conf);
if (log.isInfoEnabled())
log.info(_loc.get("slice-connect", slice, url));
try {
DataSource ds = DataSourceFactory.newDataSource(conf, false);
DecoratingDataSource dds = new DecoratingDataSource(ds);
ds = DataSourceFactory.installDBDictionary(
conf.getDBDictionaryInstance(), dds, conf, false);
if (verifyDataSource(slice, ds)) {
dataSources.add(ds);
isXA &= isXACompliant(ds);
}
} catch (Throwable ex) {
handleBadConnection(isLenient, slice, ex);
}
}
DistributedDataSource result = new DistributedDataSource(dataSources);
return result;
}
String getConnectionInfo(OpenJPAConfiguration conf) {
String result = conf.getConnectionURL();
if (result == null) {
result = conf.getConnectionDriverName();
String props = conf.getConnectionProperties();
if (props != null)
result += "(" + props + ")";
}
return result;
}
boolean isXACompliant(DataSource ds) {
if (ds instanceof DelegatingDataSource)
return ((DelegatingDataSource) ds).getInnermostDelegate()
instanceof XADataSource;
return ds instanceof XADataSource;
}
/**
* Verify that a connection can be established to the given slice. If
* connection can not be established then slice is set to INACTIVE state.
*/
private boolean verifyDataSource(Slice slice, DataSource ds) {
Connection con = null;
try {
con = ds.getConnection();
slice.setStatus(Slice.Status.ACTIVE);
if (con == null) {
slice.setStatus(Slice.Status.INACTIVE);
return false;
}
return true;
} catch (SQLException ex) {
slice.setStatus(Slice.Status.INACTIVE);
return false;
} finally {
if (con != null)
try {
con.close();
} catch (SQLException ex) {
// ignore
}
}
}
/**
* Either throw a user exception or add the configuration to the given list,
* based on <code>isLenient</code>.
*/
private void handleBadConnection(boolean isLenient, Slice slice,
Throwable ex) {
OpenJPAConfiguration conf = slice.getConfiguration();
String url = conf.getConnectionURL();
Log log = getLog(LOG_RUNTIME);
if (isLenient) {
if (ex != null) {
log.warn(_loc.get("slice-connect-known-warn", slice, url, ex
.getCause()));
} else {
log.warn(_loc.get("slice-connect-warn", slice, url));
}
} else if (ex != null) {
throw new UserException(_loc.get("slice-connect-known-error",
slice, url, ex), ex.getCause());
} else {
throw new UserException(_loc.get("slice-connect-error", slice, url));
}
}
/**
* Create individual slices with configurations from the given properties.
*/
void setSlices(Map original) {
List<String> sliceNames = findSlices(original);
Log log = getConfigurationLog();
if (sliceNames.isEmpty()) {
throw new UserException(_loc.get("slice-none-configured"));
}
for (String key : sliceNames) {
JDBCConfiguration child = new JDBCConfigurationImpl();
child.fromProperties(createSliceProperties(original, key));
child.setId(PREFIX_SLICE + key);
Slice slice = new Slice(key, child);
_slices.add(slice);
if (log.isTraceEnabled())
log.trace(_loc.get("slice-configuration", key, child
.toProperties(false)));
}
setMaster();
}
/**
* Finds the slices. If <code>slice.Names</code> property is available
* then the slices are ordered in the way they are listed. Otherwise scans
* all available slices by looking for property of the form
* <code>slice.XYZ.abc</code> where <code>XYZ</code> is the slice
* identifier and <code>abc</code> is openjpa property name. The slices
* are then ordered alphabetically.
*/
private List<String> findSlices(Map p) {
List<String> sliceNames = new ArrayList<String>();
Log log = getConfigurationLog();
String key = PREFIX_SLICE+namesPlugin.getProperty();
boolean explicit = p.containsKey(key);
if (explicit) {
String[] values = p.get(key).toString().split("\\,");
for (String name:values)
if (!sliceNames.contains(name.trim()))
sliceNames.add(name.trim());
} else {
if (log.isWarnEnabled())
log.warn(_loc.get("no-slice-names"));
sliceNames = scanForSliceNames(p);
Collections.sort(sliceNames);
}
if (log.isInfoEnabled()) {
log.info(_loc.get("slice-available", sliceNames));
}
return sliceNames;
}
private List<String> scanForSliceNames(Map p) {
List<String> sliceNames = new ArrayList<String>();
for (Object o : p.keySet()) {
String key = o.toString();
if (key.startsWith(PREFIX_SLICE) && key.split("\\.").length > 2) {
String sliceName =
chopTail(chopHead(o.toString(), PREFIX_SLICE), ".");
if (!sliceNames.contains(sliceName))
sliceNames.add(sliceName);
}
}
return sliceNames;
}
static String chopHead(String s, String head) {
if (s.startsWith(head))
return s.substring(head.length());
return s;
}
static String chopTail(String s, String tail) {
int i = s.lastIndexOf(tail);
if (i == -1)
return s;
return s.substring(0, i);
}
/**
* Creates given <code>slice</code> specific configuration properties from
* given <code>original</code> key-value map. The rules are
* <LI> if key begins with <code>"slice.XXX."</code> where
* <code>XXX</code> is the given slice name, then replace
* <code>"slice.XXX.</code> with <code>openjpa.</code>.
* <LI>if key begins with <code>"slice."</code> but not with
* <code>"slice.XXX."</code>, the ignore i.e. any property of other
* slices or global slice property e.g.
* <code>slice.DistributionPolicy</code>
* <code>if key starts with <code>"openjpa."</code> and a corresponding
* <code>"slice.XXX."</code> property does not exist, then use this as
* default property
* <code>property with any other prefix is simply copied
*
*/
Map createSliceProperties(Map original, String slice) {
Map result = new Properties();
String prefix = PREFIX_SLICE + slice + ".";
for (Object o : original.keySet()) {
String key = o.toString();
if (key.startsWith(prefix)) {
String newKey = PREFIX_OPENJPA + key.substring(prefix.length());
result.put(newKey, original.get(o));
} else if (key.startsWith(PREFIX_SLICE)) {
// ignore keys that are in 'slice.' namespace but not this slice
} else if (key.startsWith(PREFIX_OPENJPA)) {
String newKey = prefix + key.substring(PREFIX_OPENJPA.length());
if (!original.containsKey(newKey))
result.put(key, original.get(o));
} else { // keys that are neither "openjpa" nor "slice" namespace
result.put(key, original.get(o));
}
}
return result;
}
/**
* Determine the master slice.
*/
private void setMaster() {
String masterSlice = masterPlugin.get();
Log log = getConfigurationLog();
List<Slice> activeSlices = getSlices(null);
if (masterSlice == null || masterSlice.length() == 0) {
_master = activeSlices.get(0);
if (log.isWarnEnabled())
log.warn(_loc.get("no-master-slice", _master));
return;
}
for (Slice slice:activeSlices)
if (slice.getName().equals(masterSlice))
_master = slice;
if (_master == null) {
_master = activeSlices.get(0);
}
}
public String getTransactionManager() {
return txnMgrPlugin.getString();
}
public void setTransactionManager(TransactionManager txnManager) {
txnMgrPlugin.set(txnManager);
}
public TransactionManager getTransactionManagerInstance() {
if (txnMgrPlugin.get() == null) {
txnMgrPlugin.instantiate(TransactionManager.class, this);
}
return (TransactionManager) txnMgrPlugin.get();
}
public String getExecutorService() {
return executorServicePlugin.getString();
}
public void setExecutorService(ExecutorService txnManager) {
executorServicePlugin.set(txnManager);
}
public ExecutorService getExecutorServiceInstance() {
if (executorServicePlugin.get() == null) {
executorServicePlugin.instantiate(ExecutorService.class, this);
}
return (ExecutorService) executorServicePlugin.get();
}
}

View File

@ -0,0 +1,429 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice.jdbc;
import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
import java.net.URL;
import java.sql.Array;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Date;
import java.sql.ParameterMetaData;
import java.sql.PreparedStatement;
import java.sql.Ref;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Calendar;
/**
* A virtual PreparedStaement that delegates to a set of actual PreparedStatements.
*
* @author Pinaki Poddar
*
*/
class DistributedPreparedStatement extends DistributedTemplate<PreparedStatement>
implements PreparedStatement {
DistributedPreparedStatement(DistributedConnection c) {
super(c);
}
public void clearParameters() throws SQLException {
for (PreparedStatement s : this)
s.clearParameters();
}
public boolean execute() throws SQLException {
boolean ret = true;
for (PreparedStatement s : this)
ret = s.execute() & ret;
return ret;
}
public ResultSet executeQuery() throws SQLException {
DistributedResultSet mrs = new DistributedResultSet();
for (PreparedStatement t : this)
mrs.add(t.executeQuery());
return mrs;
}
public int executeUpdate() throws SQLException {
int ret = 0;
for (PreparedStatement t : this)
ret += t.executeUpdate();
return ret;
}
public ResultSetMetaData getMetaData() throws SQLException {
return master.getMetaData();
}
public ParameterMetaData getParameterMetaData() throws SQLException {
throw new UnsupportedOperationException();
}
public void setArray(int i, Array x) throws SQLException {
for (PreparedStatement t : this)
t.setArray(i, x);
}
public void setAsciiStream(int arg0, InputStream arg1, int arg2)
throws SQLException {
for (PreparedStatement t : this)
t.setAsciiStream(arg0, arg1, arg2);
}
public void setBigDecimal(int arg0, BigDecimal arg1) throws SQLException {
for (PreparedStatement t : this)
t.setBigDecimal(arg0, arg1);
}
public void setBinaryStream(int arg0, InputStream arg1, int arg2)
throws SQLException {
for (PreparedStatement t : this)
t.setBinaryStream(arg0, arg1, arg2);
}
public void setBlob(int arg0, Blob arg1) throws SQLException {
for (PreparedStatement t : this)
t.setBlob(arg0, arg1);
}
public void setBoolean(int arg0, boolean arg1) throws SQLException {
for (PreparedStatement t : this)
t.setBoolean(arg0, arg1);
}
public void setByte(int arg0, byte arg1) throws SQLException {
for (PreparedStatement t : this)
t.setByte(arg0, arg1);
}
public void setBytes(int arg0, byte[] arg1) throws SQLException {
for (PreparedStatement t : this)
t.setBytes(arg0, arg1);
}
public void setCharacterStream(int arg0, Reader arg1, int arg2)
throws SQLException {
for (PreparedStatement t : this)
t.setCharacterStream(arg0, arg1, arg2);
}
public void setClob(int arg0, Clob arg1) throws SQLException {
for (PreparedStatement t : this)
t.setClob(arg0, arg1);
}
public void setDate(int arg0, Date arg1) throws SQLException {
for (PreparedStatement t : this)
t.setDate(arg0, arg1);
}
public void setDate(int arg0, Date arg1, Calendar arg2) throws SQLException {
for (PreparedStatement t : this)
t.setDate(arg0, arg1, arg2);
}
public void setDouble(int arg0, double arg1) throws SQLException {
for (PreparedStatement t : this)
t.setDouble(arg0, arg1);
}
public void setFloat(int arg0, float arg1) throws SQLException {
for (PreparedStatement t : this)
t.setFloat(arg0, arg1);
}
public void setInt(int arg0, int arg1) throws SQLException {
for (PreparedStatement t : this)
t.setInt(arg0, arg1);
}
public void setLong(int arg0, long arg1) throws SQLException {
for (PreparedStatement t : this)
t.setLong(arg0, arg1);
}
public void setNull(int arg0, int arg1) throws SQLException {
for (PreparedStatement t : this)
t.setNull(arg0, arg1);
}
public void setNull(int arg0, int arg1, String arg2) throws SQLException {
for (PreparedStatement t : this)
t.setNull(arg0, arg1, arg2);
}
public void setObject(int arg0, Object arg1) throws SQLException {
for (PreparedStatement t : this)
t.setObject(arg0, arg1);
}
public void setObject(int arg0, Object arg1, int arg2) throws SQLException {
for (PreparedStatement t : this)
t.setObject(arg0, arg1, arg2);
}
public void setObject(int arg0, Object arg1, int arg2, int arg3)
throws SQLException {
for (PreparedStatement t : this)
t.setObject(arg0, arg1, arg2, arg3);
}
public void setRef(int arg0, Ref arg1) throws SQLException {
for (PreparedStatement t : this)
t.setRef(arg0, arg1);
}
public void setShort(int arg0, short arg1) throws SQLException {
for (PreparedStatement t : this)
t.setShort(arg0, arg1);
}
public void setString(int arg0, String arg1) throws SQLException {
for (PreparedStatement t : this)
t.setString(arg0, arg1);
}
public void setTime(int arg0, Time arg1) throws SQLException {
for (PreparedStatement t : this)
t.setTime(arg0, arg1);
}
public void setTime(int arg0, Time arg1, Calendar arg2) throws
SQLException {
for (PreparedStatement t : this)
t.setTime(arg0, arg1, arg2);
}
public void setTimestamp(int arg0, Timestamp arg1) throws SQLException {
for (PreparedStatement t : this)
t.setTimestamp(arg0, arg1);
}
public void setTimestamp(int arg0, Timestamp arg1, Calendar arg2)
throws SQLException {
for (PreparedStatement t : this)
t.setTimestamp(arg0, arg1, arg2);
}
public void setURL(int arg0, URL arg1) throws SQLException {
for (PreparedStatement t : this)
t.setURL(arg0, arg1);
}
public void setUnicodeStream(int arg0, InputStream arg1, int arg2)
throws SQLException {
for (PreparedStatement t : this)
t.setUnicodeStream(arg0, arg1, arg2);
}
public void addBatch() throws SQLException {
for (PreparedStatement t:this)
t.addBatch();
}
// public void cancel() throws SQLException {
// // TODO Auto-generated method stub
//
// }
//
// public void clearBatch() throws SQLException {
// // TODO Auto-generated method stub
//
// }
//
// public void clearWarnings() throws SQLException {
// // TODO Auto-generated method stub
//
// }
//
// public void close() throws SQLException {
// // TODO Auto-generated method stub
//
// }
//
// public boolean execute(String arg0) throws SQLException {
// // TODO Auto-generated method stub
// return false;
// }
//
// public boolean execute(String arg0, int arg1) throws SQLException {
// // TODO Auto-generated method stub
// return false;
// }
//
// public boolean execute(String arg0, int[] arg1) throws SQLException {
// // TODO Auto-generated method stub
// return false;
// }
//
// public boolean execute(String arg0, String[] arg1) throws SQLException {
// // TODO Auto-generated method stub
// return false;
// }
//
// public int[] executeBatch() throws SQLException {
// // TODO Auto-generated method stub
// return null;
// }
//
// public ResultSet executeQuery(String arg0) throws SQLException {
// // TODO Auto-generated method stub
// return null;
// }
//
// public int executeUpdate(String arg0) throws SQLException {
// // TODO Auto-generated method stub
// return 0;
// }
//
// public int executeUpdate(String arg0, int arg1) throws SQLException {
// // TODO Auto-generated method stub
// return 0;
// }
//
// public int executeUpdate(String arg0, int[] arg1) throws SQLException {
// // TODO Auto-generated method stub
// return 0;
// }
//
// public int executeUpdate(String arg0, String[] arg1) throws SQLException
// {
// // TODO Auto-generated method stub
// return 0;
// }
//
// public Connection getConnection() throws SQLException {
// // TODO Auto-generated method stub
// return null;
// }
//
// public int getFetchDirection() throws SQLException {
// // TODO Auto-generated method stub
// return 0;
// }
//
// public int getFetchSize() throws SQLException {
// // TODO Auto-generated method stub
// return 0;
// }
//
// public ResultSet getGeneratedKeys() throws SQLException {
// // TODO Auto-generated method stub
// return null;
// }
//
// public int getMaxFieldSize() throws SQLException {
// // TODO Auto-generated method stub
// return 0;
// }
//
// public int getMaxRows() throws SQLException {
// // TODO Auto-generated method stub
// return 0;
// }
//
// public boolean getMoreResults() throws SQLException {
// // TODO Auto-generated method stub
// return false;
// }
//
// public boolean getMoreResults(int arg0) throws SQLException {
// // TODO Auto-generated method stub
// return false;
// }
//
// public int getQueryTimeout() throws SQLException {
// // TODO Auto-generated method stub
// return 0;
// }
//
// public ResultSet getResultSet() throws SQLException {
// // TODO Auto-generated method stub
// return null;
// }
//
// public int getResultSetConcurrency() throws SQLException {
// // TODO Auto-generated method stub
// return 0;
// }
//
// public int getResultSetHoldability() throws SQLException {
// // TODO Auto-generated method stub
// return 0;
// }
//
// public int getResultSetType() throws SQLException {
// // TODO Auto-generated method stub
// return 0;
// }
//
// public int getUpdateCount() throws SQLException {
// // TODO Auto-generated method stub
// return 0;
// }
//
// public SQLWarning getWarnings() throws SQLException {
// // TODO Auto-generated method stub
// return null;
// }
//
// public void setCursorName(String arg0) throws SQLException {
// // TODO Auto-generated method stub
//
// }
//
// public void setEscapeProcessing(boolean arg0) throws SQLException {
// // TODO Auto-generated method stub
//
// }
//
// public void setFetchDirection(int arg0) throws SQLException {
// // TODO Auto-generated method stub
//
// }
//
// public void setFetchSize(int arg0) throws SQLException {
// // TODO Auto-generated method stub
//
// }
//
// public void setMaxFieldSize(int arg0) throws SQLException {
// // TODO Auto-generated method stub
//
// }
//
// public void setMaxRows(int arg0) throws SQLException {
// // TODO Auto-generated method stub
//
// }
//
// public void setQueryTimeout(int arg0) throws SQLException {
// // TODO Auto-generated method stub
//
// }
}

View File

@ -0,0 +1,763 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice.jdbc;
import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
import java.net.URL;
import java.sql.Array;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Date;
import java.sql.Ref;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Calendar;
import java.util.LinkedList;
import java.util.Map;
/**
* A chain of ResultSet.
*
* @author Pinaki Poddar
*
*/
class DistributedResultSet implements ResultSet {
LinkedList<ResultSet> comps = new LinkedList<ResultSet>();
ResultSet current;
int cursor = -1;
/**
* Adds the ResultSet only if it has rows.
*/
public void add(ResultSet rs) {
try {
if (rs.first())
comps.add(rs);
} catch (SQLException e) {
// ignore
}
}
public boolean absolute(int arg0) throws SQLException {
throw new UnsupportedOperationException();
}
public void afterLast() throws SQLException {
current = null;
cursor = comps.size();
}
public void beforeFirst() throws SQLException {
current = null;
cursor = -1;
}
public void cancelRowUpdates() throws SQLException {
throw new UnsupportedOperationException();
}
public void clearWarnings() throws SQLException {
for (ResultSet rs:comps)
rs.clearWarnings();
}
public void close() throws SQLException {
for (ResultSet rs:comps)
rs.close();
}
public void deleteRow() throws SQLException {
current.deleteRow();
}
public int findColumn(String arg0) throws SQLException {
return 0;
}
public boolean first() throws SQLException {
if (comps.isEmpty()) return false;
cursor = 0;
current = comps.get(0);
return true;
}
public Array getArray(int arg0) throws SQLException {
return current.getArray(arg0);
}
public Array getArray(String arg0) throws SQLException {
return current.getArray(arg0);
}
public InputStream getAsciiStream(int arg0) throws SQLException {
return current.getAsciiStream(arg0);
}
public InputStream getAsciiStream(String arg0) throws SQLException {
return current.getAsciiStream(arg0);
}
public BigDecimal getBigDecimal(int arg0) throws SQLException {
return current.getBigDecimal(arg0);
}
public BigDecimal getBigDecimal(String arg0) throws SQLException {
return current.getBigDecimal(arg0);
}
public BigDecimal getBigDecimal(int arg0, int arg1) throws SQLException {
return current.getBigDecimal(arg0, arg1);
}
public BigDecimal getBigDecimal(String arg0, int arg1) throws SQLException {
return current.getBigDecimal(arg0, arg1);
}
public InputStream getBinaryStream(int arg0) throws SQLException {
return current.getBinaryStream(arg0);
}
public InputStream getBinaryStream(String arg0) throws SQLException {
return current.getBinaryStream(arg0);
}
public Blob getBlob(int arg0) throws SQLException {
return current.getBlob(arg0);
}
public Blob getBlob(String arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public boolean getBoolean(int arg0) throws SQLException {
// TODO Auto-generated method stub
return false;
}
public boolean getBoolean(String arg0) throws SQLException {
// TODO Auto-generated method stub
return false;
}
public byte getByte(int arg0) throws SQLException {
// TODO Auto-generated method stub
return 0;
}
public byte getByte(String arg0) throws SQLException {
// TODO Auto-generated method stub
return 0;
}
public byte[] getBytes(int arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public byte[] getBytes(String arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Reader getCharacterStream(int arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Reader getCharacterStream(String arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Clob getClob(int arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Clob getClob(String arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public int getConcurrency() throws SQLException {
// TODO Auto-generated method stub
return 0;
}
public String getCursorName() throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Date getDate(int arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Date getDate(String arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Date getDate(int arg0, Calendar arg1) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Date getDate(String arg0, Calendar arg1) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public double getDouble(int arg0) throws SQLException {
// TODO Auto-generated method stub
return 0;
}
public double getDouble(String arg0) throws SQLException {
// TODO Auto-generated method stub
return 0;
}
public int getFetchDirection() throws SQLException {
// TODO Auto-generated method stub
return 0;
}
public int getFetchSize() throws SQLException {
// TODO Auto-generated method stub
return 0;
}
public float getFloat(int arg0) throws SQLException {
// TODO Auto-generated method stub
return 0;
}
public float getFloat(String arg0) throws SQLException {
// TODO Auto-generated method stub
return 0;
}
public int getInt(int arg0) throws SQLException {
// TODO Auto-generated method stub
return 0;
}
public int getInt(String arg0) throws SQLException {
// TODO Auto-generated method stub
return 0;
}
public long getLong(int arg0) throws SQLException {
// TODO Auto-generated method stub
return 0;
}
public long getLong(String arg0) throws SQLException {
// TODO Auto-generated method stub
return 0;
}
public ResultSetMetaData getMetaData() throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Object getObject(int arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Object getObject(String arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Object getObject(int arg0, Map<String, Class<?>> arg1)
throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Object getObject(String arg0, Map<String, Class<?>> arg1)
throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Ref getRef(int arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Ref getRef(String arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public int getRow() throws SQLException {
// TODO Auto-generated method stub
return 0;
}
public short getShort(int arg0) throws SQLException {
// TODO Auto-generated method stub
return 0;
}
public short getShort(String arg0) throws SQLException {
// TODO Auto-generated method stub
return 0;
}
public Statement getStatement() throws SQLException {
// TODO Auto-generated method stub
return null;
}
public String getString(int arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public String getString(String arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Time getTime(int arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Time getTime(String arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Time getTime(int arg0, Calendar arg1) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Time getTime(String arg0, Calendar arg1) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Timestamp getTimestamp(int arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Timestamp getTimestamp(String arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Timestamp getTimestamp(int arg0, Calendar arg1) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public Timestamp getTimestamp(String arg0, Calendar arg1)
throws SQLException {
// TODO Auto-generated method stub
return null;
}
public int getType() throws SQLException {
// TODO Auto-generated method stub
return 0;
}
public URL getURL(int arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public URL getURL(String arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public InputStream getUnicodeStream(int arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public InputStream getUnicodeStream(String arg0) throws SQLException {
// TODO Auto-generated method stub
return null;
}
public SQLWarning getWarnings() throws SQLException {
// TODO Auto-generated method stub
return null;
}
public void insertRow() throws SQLException {
// TODO Auto-generated method stub
}
public boolean isAfterLast() throws SQLException {
return current == null && cursor >= comps.size();
}
public boolean isBeforeFirst() throws SQLException {
return current == null && cursor<0;
}
public boolean isFirst() throws SQLException {
return current != null && current.isFirst() && cursor==0;
}
public boolean isLast() throws SQLException {
return current != null && current.isLast() && cursor==comps.size()-1;
}
public boolean last() throws SQLException {
if (comps.isEmpty()) return false;
cursor = comps.size()-1;
return false;
}
public void moveToCurrentRow() throws SQLException {
// TODO Auto-generated method stub
}
public void moveToInsertRow() throws SQLException {
// TODO Auto-generated method stub
}
public boolean next() throws SQLException {
if (current == null) {
current = comps.get(0);
cursor = 0;
}
if (current.next())
return true;
cursor++;
if (cursor<comps.size())
current = comps.get(cursor);
return cursor<comps.size();
}
public boolean previous() throws SQLException {
return current.previous();
}
public void refreshRow() throws SQLException {
// TODO Auto-generated method stub
}
public boolean relative(int arg0) throws SQLException {
// TODO Auto-generated method stub
return false;
}
public boolean rowDeleted() throws SQLException {
// TODO Auto-generated method stub
return false;
}
public boolean rowInserted() throws SQLException {
// TODO Auto-generated method stub
return false;
}
public boolean rowUpdated() throws SQLException {
// TODO Auto-generated method stub
return false;
}
public void setFetchDirection(int arg0) throws SQLException {
// TODO Auto-generated method stub
}
public void setFetchSize(int arg0) throws SQLException {
// TODO Auto-generated method stub
}
public void updateArray(int arg0, Array arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateArray(String arg0, Array arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateAsciiStream(int arg0, InputStream arg1, int arg2)
throws SQLException {
// TODO Auto-generated method stub
}
public void updateAsciiStream(String arg0, InputStream arg1, int arg2)
throws SQLException {
// TODO Auto-generated method stub
}
public void updateBigDecimal(int arg0, BigDecimal arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateBigDecimal(String arg0, BigDecimal arg1)
throws SQLException {
// TODO Auto-generated method stub
}
public void updateBinaryStream(int arg0, InputStream arg1, int arg2)
throws SQLException {
// TODO Auto-generated method stub
}
public void updateBinaryStream(String arg0, InputStream arg1, int arg2)
throws SQLException {
// TODO Auto-generated method stub
}
public void updateBlob(int arg0, Blob arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateBlob(String arg0, Blob arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateBoolean(int arg0, boolean arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateBoolean(String arg0, boolean arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateByte(int arg0, byte arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateByte(String arg0, byte arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateBytes(int arg0, byte[] arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateBytes(String arg0, byte[] arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateCharacterStream(int arg0, Reader arg1, int arg2)
throws SQLException {
// TODO Auto-generated method stub
}
public void updateCharacterStream(String arg0, Reader arg1, int arg2)
throws SQLException {
// TODO Auto-generated method stub
}
public void updateClob(int arg0, Clob arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateClob(String arg0, Clob arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateDate(int arg0, Date arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateDate(String arg0, Date arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateDouble(int arg0, double arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateDouble(String arg0, double arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateFloat(int arg0, float arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateFloat(String arg0, float arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateInt(int arg0, int arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateInt(String arg0, int arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateLong(int arg0, long arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateLong(String arg0, long arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateNull(int arg0) throws SQLException {
// TODO Auto-generated method stub
}
public void updateNull(String arg0) throws SQLException {
// TODO Auto-generated method stub
}
public void updateObject(int arg0, Object arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateObject(String arg0, Object arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateObject(int arg0, Object arg1, int arg2)
throws SQLException {
// TODO Auto-generated method stub
}
public void updateObject(String arg0, Object arg1, int arg2)
throws SQLException {
// TODO Auto-generated method stub
}
public void updateRef(int arg0, Ref arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateRef(String arg0, Ref arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateRow() throws SQLException {
// TODO Auto-generated method stub
}
public void updateShort(int arg0, short arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateShort(String arg0, short arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateString(int arg0, String arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateString(String arg0, String arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateTime(int arg0, Time arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateTime(String arg0, Time arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateTimestamp(int arg0, Timestamp arg1) throws SQLException {
// TODO Auto-generated method stub
}
public void updateTimestamp(String arg0, Timestamp arg1)
throws SQLException {
// TODO Auto-generated method stub
}
public boolean wasNull() throws SQLException {
// TODO Auto-generated method stub
return false;
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice.jdbc;
import java.sql.Statement;
/**
* A virtual Statement that delegates to many actual Statements.
*
* @author Pinaki Poddar
*
*/
class DistributedStatement extends DistributedTemplate<Statement> {
public DistributedStatement(DistributedConnection c) {
super(c);
}
}

View File

@ -0,0 +1,494 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice.jdbc;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.naming.ConfigurationException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.apache.openjpa.conf.OpenJPAConfiguration;
import org.apache.openjpa.enhance.PersistenceCapable;
import org.apache.openjpa.jdbc.conf.JDBCConfiguration;
import org.apache.openjpa.jdbc.kernel.ConnectionInfo;
import org.apache.openjpa.jdbc.kernel.JDBCStore;
import org.apache.openjpa.jdbc.kernel.JDBCStoreManager;
import org.apache.openjpa.jdbc.sql.Result;
import org.apache.openjpa.jdbc.sql.ResultSetResult;
import org.apache.openjpa.kernel.FetchConfiguration;
import org.apache.openjpa.kernel.OpenJPAStateManager;
import org.apache.openjpa.kernel.PCState;
import org.apache.openjpa.kernel.QueryLanguages;
import org.apache.openjpa.kernel.Seq;
import org.apache.openjpa.kernel.StoreContext;
import org.apache.openjpa.kernel.StoreManager;
import org.apache.openjpa.kernel.StoreQuery;
import org.apache.openjpa.kernel.exps.ExpressionParser;
import org.apache.openjpa.lib.log.Log;
import org.apache.openjpa.lib.rop.MergedResultObjectProvider;
import org.apache.openjpa.lib.rop.ResultObjectProvider;
import org.apache.openjpa.lib.util.Localizer;
import org.apache.openjpa.meta.ClassMetaData;
import org.apache.openjpa.meta.FieldMetaData;
import org.apache.openjpa.slice.DistributionPolicy;
import org.apache.openjpa.slice.transaction.DistributedNaiveTransaction;
import org.apache.openjpa.slice.transaction.DistributedTransactionManager;
import org.apache.openjpa.slice.transaction.NaiveTransactionManager;
import org.apache.openjpa.util.InternalException;
import org.apache.openjpa.util.StoreException;
import org.apache.openjpa.util.UserException;
/**
* A Store manager for multiple physical databases referred as <em>slice</em>.
* This receiver behaves like a Transaction Manager as it implements two-phase
* commit protocol if all the component slices is XA-complaint. The actions are
* delegated to the underlying slices. The actions are executed in parallel
* threads whenever possible such as flushing or query. <br>
*
* @author Pinaki Poddar
*
*/
class DistributedStoreManager extends JDBCStoreManager {
private final List<SliceStoreManager> _slices;
private JDBCStoreManager _master;
private boolean isXA;
private TransactionManager _tm;
private final DistributedJDBCConfiguration _conf;
private Log _log;
private static final Localizer _loc =
Localizer.forPackage(DistributedStoreManager.class);
private static ExecutorService threadPool = Executors.newCachedThreadPool();
/**
* Constructs a set of child StoreManagers each connected to a physical
* DataSource.
*
* The supplied configuration carries multiple URL for underlying physical
* slices. The first slice is referred as <em>master</em> and is used to
* get Sequence based entity identifiers.
*/
public DistributedStoreManager(DistributedJDBCConfiguration conf) {
super();
_conf = conf;
_log = conf.getLog(OpenJPAConfiguration.LOG_RUNTIME);
_slices = new ArrayList<SliceStoreManager>();
for (String name : conf.getActiveSliceNames()) {
SliceStoreManager slice = new SliceStoreManager
(conf.getSlice(name));
_slices.add(slice);
if (slice.getName().equals(conf.getMaster().getName()))
_master = slice;
}
}
public DistributedJDBCConfiguration getConfiguration() {
return _conf;
}
/**
* Decides the index of the StoreManager by first looking at the
* implementation data. If not found then {@link DistributionPolicy
* DistributionPolicy} determines the target store for new instances and
* additional connection info is used to estimate for the existing
* instances.
*/
protected String findSliceName(OpenJPAStateManager sm, Object info) {
boolean hasIndex = hasSlice(sm);
if (hasIndex)
return sm.getImplData().toString();
String slice = estimateSlice(sm, info);
if (slice == null)
return assignSlice(sm);
return slice;
}
private boolean hasSlice(OpenJPAStateManager sm) {
return sm.getImplData() != null;
}
private String assignSlice(OpenJPAStateManager sm) {
PersistenceCapable pc = sm.getPersistenceCapable();
String slice =
_conf.getDistributionPolicyInstance().distribute(pc,
_conf.getActiveSliceNames(), getContext());
if (!_conf.getActiveSliceNames().contains(slice)) {
throw new UserException(_loc.get("bad-policy-slice", new Object[] {
_conf.getDistributionPolicyInstance().getClass().getName(),
slice, sm.getPersistenceCapable(),
_conf.getActiveSliceNames() }));
}
sm.setImplData(slice, true);
return slice;
}
/**
* The additional edata is used, if possible, to find the StoreManager
* managing the given StateManager. If the additional data is unavailable
* then return null.
*
*/
private String estimateSlice(OpenJPAStateManager sm, Object edata) {
if (edata == null || !(edata instanceof ConnectionInfo))
return null;
Result result = ((ConnectionInfo) edata).result;
if (result instanceof ResultSetResult) {
JDBCStore store = ((ResultSetResult) result).getStore();
for (SliceStoreManager slice : _slices) {
if (slice == store) {
String sliceId = slice.getName();
sm.setImplData(sliceId, true);
return sliceId;
}
}
}
return null;
}
/**
* Selects a child StoreManager where the given instance resides.
*/
private StoreManager selectStore(OpenJPAStateManager sm, Object edata) {
String name = findSliceName(sm, edata);
SliceStoreManager slice = lookup(name);
if (slice == null)
throw new InternalException(_loc.get("wrong-slice", name, sm));
return slice;
}
public boolean assignField(OpenJPAStateManager sm, int field,
boolean preFlush) {
return selectStore(sm, null).assignField(sm, field, preFlush);
}
public boolean assignObjectId(OpenJPAStateManager sm, boolean preFlush) {
return _master.assignObjectId(sm, preFlush);
}
public void beforeStateChange(OpenJPAStateManager sm, PCState fromState,
PCState toState) {
_master.beforeStateChange(sm, fromState, toState);
}
public void begin() {
TransactionManager tm = getTransactionManager();
for (SliceStoreManager slice : _slices) {
try {
Transaction txn = tm.getTransaction();
if (isXA) {
txn.enlistResource(slice.getXAConnection().getXAResource());
} else { // This is the only place where casting to our
// internal implementation classes become necessary
((DistributedNaiveTransaction) txn).enlistResource(slice);
}
} catch (Exception e) {
throw new InternalException(e);
}
}
try {
tm.begin();
} catch (Exception e) {
throw new StoreException(e);
}
}
Log getLog(SliceStoreManager slice) {
return slice.getConfiguration()
.getLog(OpenJPAConfiguration.LOG_RUNTIME);
}
public void beginOptimistic() {
for (SliceStoreManager slice : _slices)
slice.beginOptimistic();
}
public boolean cancelAll() {
boolean ret = true;
for (SliceStoreManager slice : _slices)
ret = slice.cancelAll() & ret;
return ret;
}
public void close() {
for (SliceStoreManager slice : _slices)
slice.close();
}
public void commit() {
TransactionManager tm = getTransactionManager();
try {
tm.commit();
} catch (Exception e) {
throw new StoreException(e);
}
}
public int compareVersion(OpenJPAStateManager sm, Object v1, Object v2) {
return selectStore(sm, null).compareVersion(sm, v1, v2);
}
public Object copyDataStoreId(Object oid, ClassMetaData meta) {
return _master.copyDataStoreId(oid, meta);
}
public ResultObjectProvider executeExtent(ClassMetaData meta,
boolean subclasses, FetchConfiguration fetch) {
ResultObjectProvider[] tmp = new ResultObjectProvider[_slices.size()];
int i = 0;
for (SliceStoreManager slice : _slices) {
tmp[i++] = slice.executeExtent(meta, subclasses, fetch);
}
return new MergedResultObjectProvider(tmp);
}
public boolean exists(OpenJPAStateManager sm, Object edata) {
for (SliceStoreManager slice : _slices) {
if (slice.exists(sm, edata)) {
sm.setImplData(slice.getName(), true);
return true;
}
}
return false;
}
/**
* Flush the given StateManagers after binning them to respective physical
* slices.
*/
public Collection flush(Collection sms) {
Collection exceptions = new ArrayList();
List<Future<Collection>> futures = new ArrayList<Future<Collection>>();
Map<String, List<OpenJPAStateManager>> subsets = bin(sms, null);
for (SliceStoreManager slice : _slices) {
List<OpenJPAStateManager> subset = subsets.get(slice.getName());
if (subset.isEmpty())
continue;
futures.add(threadPool.submit(new Flusher(slice, subset)));
}
for (Future<Collection> future : futures) {
Collection error;
try {
error = future.get();
if (!(error == null || error.isEmpty())) {
exceptions.addAll(error);
}
} catch (InterruptedException e) {
throw new StoreException(e);
} catch (ExecutionException e) {
throw new StoreException(e.getCause());
}
}
return exceptions;
}
/**
* Separate the given list of StateManagers in separate lists for each slice
* by the associated slice identifier of each StateManager.
* @param sms
* @param edata
* @return
*/
private Map<String, List<OpenJPAStateManager>> bin(
Collection/*<StateManage>*/ sms, Object edata) {
Map<String, List<OpenJPAStateManager>> subsets =
new HashMap<String, List<OpenJPAStateManager>>();
for (SliceStoreManager slice : _slices)
subsets.put(slice.getName(), new ArrayList<OpenJPAStateManager>());
for (Object x : sms) {
OpenJPAStateManager sm = (OpenJPAStateManager) x;
String slice = findSliceName(sm, edata);
subsets.get(slice).add(sm);
}
return subsets;
}
public Object getClientConnection() {
throw new UnsupportedOperationException();
}
public Seq getDataStoreIdSequence(ClassMetaData forClass) {
return _master.getDataStoreIdSequence(forClass);
}
public Class getDataStoreIdType(ClassMetaData meta) {
return _master.getDataStoreIdType(meta);
}
public Class getManagedType(Object oid) {
return _master.getManagedType(oid);
}
public Seq getValueSequence(FieldMetaData forField) {
return _master.getValueSequence(forField);
}
public boolean initialize(OpenJPAStateManager sm, PCState state,
FetchConfiguration fetch, Object edata) {
if (edata instanceof ConnectionInfo) {
String slice = findSliceName(sm, (ConnectionInfo) edata);
if (slice != null)
return lookup(slice).initialize(sm, state, fetch, edata);
}
// not a part of Query result load. Look into the slices till found
for (SliceStoreManager slice : _slices) {
if (slice.initialize(sm, state, fetch, edata)) {
sm.setImplData(slice.getName(), true);
return true;
}
}
return false;
}
public boolean load(OpenJPAStateManager sm, BitSet fields,
FetchConfiguration fetch, int lockLevel, Object edata) {
return selectStore(sm, edata).load(sm, fields, fetch, lockLevel, edata);
}
public Collection loadAll(Collection sms, PCState state, int load,
FetchConfiguration fetch, Object edata) {
Map<String, List<OpenJPAStateManager>> subsets = bin(sms, edata);
Collection result = new ArrayList();
for (SliceStoreManager slice : _slices) {
List<OpenJPAStateManager> subset = subsets.get(slice.getName());
if (subset.isEmpty())
continue;
Collection tmp = slice.loadAll(subset, state, load, fetch, edata);
if (tmp != null && !tmp.isEmpty())
result.addAll(tmp);
}
return result;
}
public Object newDataStoreId(Object oidVal, ClassMetaData meta) {
return _master.newDataStoreId(oidVal, meta);
}
public FetchConfiguration newFetchConfiguration() {
return _master.newFetchConfiguration();
}
/**
* Construct a distributed query to be executed against all the slices.
*/
public StoreQuery newQuery(String language) {
ExpressionParser parser = QueryLanguages.parserForLanguage(language);
DistributedStoreQuery ret = new DistributedStoreQuery(this, parser);
for (SliceStoreManager slice : _slices) {
ret.add(slice.newQuery(language));
}
return ret;
}
public void releaseConnection() {
for (SliceStoreManager slice : _slices)
slice.releaseConnection();
}
public void retainConnection() {
for (SliceStoreManager slice : _slices)
slice.retainConnection();
}
public void rollback() {
TransactionManager tm = getTransactionManager();
try {
tm.rollback();
} catch (Exception e) {
throw new StoreException(e);
}
}
public void rollbackOptimistic() {
for (SliceStoreManager slice : _slices)
slice.rollbackOptimistic();
}
/**
* Sets the context for this receiver and all its underlying slices.
*/
public void setContext(StoreContext ctx) {
super.setContext(ctx);
isXA = true;
for (SliceStoreManager store : _slices) {
store.setContext(ctx,
(JDBCConfiguration)store.getSlice().getConfiguration());
isXA &= store.isXAEnabled();
}
_tm = getTransactionManager();
}
private SliceStoreManager lookup(String name) {
for (SliceStoreManager slice : _slices)
if (slice.getName().equals(name))
return slice;
return null;
}
public boolean syncVersion(OpenJPAStateManager sm, Object edata) {
return selectStore(sm, edata).syncVersion(sm, edata);
}
protected TransactionManager getTransactionManager() {
if (_tm == null) {
_tm = getConfiguration().getTransactionManagerInstance();
String alias = getConfiguration().getTransactionManager();
boolean is2pc = !(_tm instanceof NaiveTransactionManager);
if (isXA) {
if (!is2pc) {
_log.warn(_loc.get("resource-xa-tm-not-2pc", alias));
isXA = false;
}
} else if (is2pc) {
throw new UserException(_loc.get("resource-not-xa-tm-2pc",
alias));
}
}
return _tm;
}
private static class Flusher implements Callable<Collection> {
final SliceStoreManager store;
final Collection toFlush;
Flusher(SliceStoreManager store, Collection toFlush) {
this.store = store;
this.toFlush = toFlush;
}
public Collection call() throws Exception {
return store.flush(toFlush);
}
}
}

View File

@ -0,0 +1,231 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice.jdbc;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.openjpa.jdbc.kernel.JDBCStore;
import org.apache.openjpa.jdbc.kernel.JDBCStoreQuery;
import org.apache.openjpa.kernel.ExpressionStoreQuery;
import org.apache.openjpa.kernel.OrderingMergedResultObjectProvider;
import org.apache.openjpa.kernel.QueryContext;
import org.apache.openjpa.kernel.StoreQuery;
import org.apache.openjpa.kernel.exps.ExpressionParser;
import org.apache.openjpa.lib.rop.MergedResultObjectProvider;
import org.apache.openjpa.lib.rop.ResultObjectProvider;
import org.apache.openjpa.meta.ClassMetaData;
import org.apache.openjpa.util.StoreException;
/**
* A query for distributed databases.
*
* @author Pinaki Poddar
*
*/
@SuppressWarnings("serial")
class DistributedStoreQuery extends JDBCStoreQuery {
private List<StoreQuery> _queries = new ArrayList<StoreQuery>();
private ExpressionParser _parser;
public DistributedStoreQuery(JDBCStore store, ExpressionParser parser) {
super(store, parser);
_parser = parser;
}
void add(StoreQuery q) {
_queries.add(q);
}
public Executor newDataStoreExecutor(ClassMetaData meta, boolean subs) {
ParallelExecutor ex = new ParallelExecutor(this, meta, subs, _parser,
ctx.getCompilation());
for (StoreQuery q:_queries) {
ex.addExecutor(q.newDataStoreExecutor(meta, subs));
}
return ex;
}
public void setContext(QueryContext ctx) {
super.setContext(ctx);
for (StoreQuery q:_queries)
q.setContext(ctx);
}
public ExecutorService getExecutorServiceInstance() {
DistributedJDBCConfiguration conf =
((DistributedJDBCConfiguration)getStore().getConfiguration());
return conf.getExecutorServiceInstance();
}
/**
* Executes queries on multiple databases.
*
* @author Pinaki Poddar
*
*/
public static class ParallelExecutor extends
ExpressionStoreQuery.DataStoreExecutor {
private List<Executor> executors = new ArrayList<Executor>();
private DistributedStoreQuery owner = null;
private ExecutorService threadPool = null;
public void addExecutor(Executor ex) {
executors.add(ex);
}
public ParallelExecutor(DistributedStoreQuery dsq, ClassMetaData meta,
boolean subclasses, ExpressionParser parser, Object parsed) {
super(dsq, meta, subclasses, parser, parsed);
owner = dsq;
threadPool = ((DistributedJDBCConfiguration)dsq.getStore()
.getConfiguration()).getExecutorServiceInstance();
}
/**
* Each child query must be executed with slice context and not the
* given query context.
*/
public ResultObjectProvider executeQuery(StoreQuery q,
final Object[] params, final Range range) {
ResultObjectProvider[] tmp = new ResultObjectProvider[executors.size()];
final Iterator<StoreQuery> qs = owner._queries.iterator();
final List<Future<ResultObjectProvider>> futures =
new ArrayList<Future<ResultObjectProvider>>();
int i = 0;
for (Executor ex:executors) {
QueryExecutor call = new QueryExecutor();
call.executor = ex;
call.query = qs.next();
call.params = params;
call.range = range;
futures.add(threadPool.submit(call));
}
for (Future<ResultObjectProvider> future:futures) {
try {
tmp[i++] = future.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new StoreException(e.getCause());
}
}
boolean[] ascending = getAscending(q);
boolean isAscending = ascending.length > 0;
boolean isUnique = q.getContext().isUnique();
if (isUnique) {
return new UniqueResultObjectProvider(tmp, q, getQueryExpressions());
}
if (isAscending) {
return new OrderingMergedResultObjectProvider(tmp, ascending,
(Executor[])executors.toArray(new Executor[executors.size()]),
q, params);
}
return new MergedResultObjectProvider(tmp);
}
public Number executeDelete(StoreQuery q, Object[] params) {
Iterator<StoreQuery> qs = owner._queries.iterator();
final List<Future<Number>> futures = new ArrayList<Future<Number>>();
for (Executor ex:executors) {
DeleteExecutor call = new DeleteExecutor();
call.executor = ex;
call.query = qs.next();
call.params = params;
futures.add(threadPool.submit(call));
}
int N = 0;
for (Future<Number> future:futures) {
try {
Number n = future.get();
if (n != null)
N += n.intValue();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new StoreException(e.getCause());
}
}
return new Integer(N);
}
public Number executeUpdate(StoreQuery q, Object[] params) {
Iterator<StoreQuery> qs = owner._queries.iterator();
final List<Future<Number>> futures = new ArrayList<Future<Number>>();
for (Executor ex:executors) {
UpdateExecutor call = new UpdateExecutor();
call.executor = ex;
call.query = qs.next();
call.params = params;
futures.add(threadPool.submit(call));
}
int N = 0;
for (Future<Number> future:futures) {
try {
Number n = future.get();
if (n != null)
N += n.intValue();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new StoreException(e.getCause());
}
}
return new Integer(N);
}
}
static class QueryExecutor implements Callable<ResultObjectProvider> {
StoreQuery query;
Executor executor;
Object[] params;
Range range;
public ResultObjectProvider call() throws Exception {
return executor.executeQuery(query, params, range);
}
}
static class DeleteExecutor implements Callable<Number> {
StoreQuery query;
Executor executor;
Object[] params;
public Number call() throws Exception {
return executor.executeDelete(query, params);
}
}
static class UpdateExecutor implements Callable<Number> {
StoreQuery query;
Executor executor;
Object[] params;
public Number call() throws Exception {
return executor.executeDelete(query, params);
}
}
}

View File

@ -0,0 +1,273 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice.jdbc;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* A template for multiple Statements being executed by multiple connections.
*
* @author Pinaki Poddar
*
*/
class DistributedTemplate<T extends Statement>
implements Statement, Iterable<T> {
protected List<T> stmts = new ArrayList<T>();
protected final DistributedConnection con;
protected T master;
public DistributedTemplate(DistributedConnection c) {
con = c;
}
public Iterator<T> iterator() {
return stmts.iterator();
}
public void add(T s) {
if (stmts.isEmpty())
master = s;
try {
if (!con.contains(s.getConnection()))
throw new IllegalArgumentException(s + " has different connection");
stmts.add(s);
} catch (SQLException e) {
e.printStackTrace();
}
}
public void addBatch(String sql) throws SQLException {
for (T s:this)
s.addBatch(sql);
}
public void cancel() throws SQLException {
for (T s:this)
s.cancel();
}
public void clearBatch() throws SQLException {
for (T s:this)
s.clearBatch();
}
public void clearWarnings() throws SQLException {
for (T s:this)
s.clearWarnings();
}
public void close() throws SQLException {
for (T s:this)
s.close();
}
public boolean execute(String arg0) throws SQLException {
boolean ret = true;
for (T s:this)
ret = s.execute(arg0) & ret;
return ret;
}
public boolean execute(String arg0, int arg1) throws SQLException {
boolean ret = true;
for (T s:this)
ret = s.execute(arg0, arg1) & ret;
return ret;
}
public boolean execute(String arg0, int[] arg1) throws SQLException {
boolean ret = true;
for (T s:this)
ret = s.execute(arg0, arg1) & ret;
return ret;
}
public boolean execute(String arg0, String[] arg1) throws SQLException {
boolean ret = true;
for (T s:this)
ret = s.execute(arg0, arg1) & ret;
return ret;
}
public int[] executeBatch() throws SQLException {
int[] ret = new int[0];
for (Statement s:this) {
int[] tmp = s.executeBatch();
ret = new int[ret.length + tmp.length];
System.arraycopy(tmp, 0, ret, ret.length-tmp.length, tmp.length);
}
return ret;
}
public ResultSet executeQuery() throws SQLException {
DistributedResultSet rs = new DistributedResultSet();
for (T s:this)
rs.add(s.executeQuery(null));
return rs;
}
public ResultSet executeQuery(String arg0) throws SQLException {
DistributedResultSet rs = new DistributedResultSet();
for (T s:this)
rs.add(s.executeQuery(arg0));
return rs;
}
public int executeUpdate(String arg0) throws SQLException {
int ret = 0;
for (T s:this)
ret += s.executeUpdate(arg0);
return ret;
}
public int executeUpdate(String arg0, int arg1) throws SQLException {
int ret = 0;
for (T s:this)
ret += s.executeUpdate(arg0, arg1);
return ret;
}
public int executeUpdate(String arg0, int[] arg1) throws SQLException {
int ret = 0;
for (T s:this)
ret += s.executeUpdate(arg0, arg1);
return ret;
}
public int executeUpdate(String arg0, String[] arg1) throws SQLException {
int ret = 0;
for (T s:this)
ret += s.executeUpdate(arg0, arg1);
return ret;
}
public Connection getConnection() throws SQLException {
return con;
}
public int getFetchDirection() throws SQLException {
return master.getFetchDirection();
}
public int getFetchSize() throws SQLException {
return master.getFetchSize();
}
public ResultSet getGeneratedKeys() throws SQLException {
DistributedResultSet mrs = new DistributedResultSet();
for (T s:this)
mrs.add(s.getGeneratedKeys());
return mrs;
}
public int getMaxFieldSize() throws SQLException {
return master.getMaxFieldSize();
}
public int getMaxRows() throws SQLException {
return master.getMaxRows();
}
public boolean getMoreResults() throws SQLException {
for (T s:this)
if (s.getMoreResults())
return true;
return false;
}
public boolean getMoreResults(int arg0) throws SQLException {
for (T s:this)
if (s.getMoreResults(arg0))
return true;
return false;
}
public int getQueryTimeout() throws SQLException {
return master.getQueryTimeout();
}
public ResultSet getResultSet() throws SQLException {
DistributedResultSet rs = new DistributedResultSet();
for (T s:this)
rs.add(s.getResultSet());
return rs;
}
public int getResultSetConcurrency() throws SQLException {
return master.getResultSetConcurrency();
}
public int getResultSetHoldability() throws SQLException {
return master.getResultSetHoldability();
}
public int getResultSetType() throws SQLException {
return master.getResultSetType();
}
public int getUpdateCount() throws SQLException {
return master.getUpdateCount();
}
public SQLWarning getWarnings() throws SQLException {
return master.getWarnings();
}
public void setCursorName(String name) throws SQLException {
for (T s:this)
s.setCursorName(name);
}
public void setEscapeProcessing(boolean flag) throws SQLException {
for (T s:this)
s.setEscapeProcessing(flag);
}
public void setFetchDirection(int dir) throws SQLException {
for (T s:this)
s.setFetchDirection(dir);
}
public void setFetchSize(int size) throws SQLException {
for (T s:this)
s.setFetchSize(size);
}
public void setMaxFieldSize(int size) throws SQLException {
for (T s:this)
s.setMaxFieldSize(size);
}
public void setMaxRows(int n) throws SQLException {
for (T s:this)
s.setMaxFieldSize(n);
}
public void setQueryTimeout(int n) throws SQLException {
for (T s:this)
s.setMaxFieldSize(n);
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice.jdbc;
import java.sql.Connection;
import java.sql.SQLException;
import javax.sql.DataSource;
import javax.sql.XAConnection;
import javax.sql.XADataSource;
import org.apache.openjpa.jdbc.kernel.JDBCStoreManager;
import org.apache.openjpa.lib.jdbc.DelegatingDataSource;
import org.apache.openjpa.lib.util.Localizer;
import org.apache.openjpa.slice.Slice;
import org.apache.openjpa.util.InternalException;
/**
* A specialized JDBCStoreManager for XA-complaint DataSource.
* If the configured DataSource is not XA-complaint, behaves as the super
* implementation.
*
* @author Pinaki Poddar
*
*/
public class SliceStoreManager extends JDBCStoreManager {
private final Slice _slice;
private Boolean isXAEnabled;
private XAConnection xcon;
private static final Localizer _loc =
Localizer.forPackage(SliceStoreManager.class);
/**
* Construct with immutable logical name of the slice.
*/
public SliceStoreManager(Slice slice) {
_slice = slice;
}
/**
* Gets the slice for which this receiver is working.
*/
public Slice getSlice() {
return _slice;
}
public String getName() {
return _slice.getName();
}
/**
* Gets the connection via XAConnection if the datasource is XA-complaint.
* Otherwise, behaves exactly as the super implementation.
*/
@Override
protected RefCountConnection connectInternal() throws SQLException {
if (!isXAEnabled)
return super.connectInternal();
XADataSource xds = getXADataSource();
xcon = xds.getXAConnection();
Connection con = xcon.getConnection();
return new RefCountConnection(con);
}
/**
* Gets the XAConnection if connected and XA-complaint. Otherwise null.
*/
public XAConnection getXAConnection() {
return xcon;
}
private XADataSource getXADataSource() {
if (!isXAEnabled())
throw new InternalException(_loc.get("slice-not-xa", this));
return (XADataSource)getInnerDataSource();
}
/**
* Affirms if the configured DataSource is XA-complaint.
* Can return null if the context has not been set yet.
*/
public boolean isXAEnabled() {
if (isXAEnabled == null) {
isXAEnabled = getInnerDataSource() instanceof XADataSource;
}
return isXAEnabled.booleanValue();
}
private DataSource getInnerDataSource() {
DataSource parent = super.getDataSource();
DataSource real = (parent instanceof DelegatingDataSource) ?
((DelegatingDataSource)parent).getInnermostDelegate()
: parent;
return real;
}
}

View File

@ -0,0 +1,148 @@
package org.apache.openjpa.slice.jdbc;
import org.apache.openjpa.kernel.StoreQuery;
import org.apache.openjpa.kernel.exps.QueryExpressions;
import org.apache.openjpa.kernel.exps.Value;
import org.apache.openjpa.lib.rop.ResultObjectProvider;
import org.apache.openjpa.lib.util.Localizer;
import org.apache.openjpa.util.InternalException;
import org.apache.openjpa.util.UserException;
/**
* Aggregates individual single query results from different databases.
*
* @author Pinaki Poddar
*
*/
public class UniqueResultObjectProvider implements ResultObjectProvider {
private final ResultObjectProvider[] _rops;
private final StoreQuery _query;
private final QueryExpressions[] _exps;
private Object _single;
private boolean _opened;
private static final String COUNT = "Count";
private static final String MAX = "Max";
private static final String MIN = "Min";
private static final String SUM = "Sum";
private static final Localizer _loc =
Localizer.forPackage(UniqueResultObjectProvider.class);
public UniqueResultObjectProvider(ResultObjectProvider[] rops,
StoreQuery q, QueryExpressions[] exps) {
_rops = rops;
_query = q;
_exps = exps;
}
public boolean absolute(int pos) throws Exception {
return false;
}
public void close() throws Exception {
_opened = false;
for (ResultObjectProvider rop:_rops)
rop.close();
}
public Object getResultObject() throws Exception {
if (!_opened)
throw new InternalException(_loc.get("not-open"));
return _single;
}
public void handleCheckedException(Exception e) {
_rops[0].handleCheckedException(e);
}
public boolean next() throws Exception {
if (!_opened) {
open();
}
if (_single != null)
return false;
Value[] values = _exps[0].projections;
Object[] single = new Object[values.length];
for (int i=0; i<values.length; i++) {
Value v = values[i];
boolean isAggregate = v.isAggregate();
String op = v.getClass().getSimpleName();
for (ResultObjectProvider rop:_rops) {
rop.next();
Object[] row = (Object[]) rop.getResultObject();
if (isAggregate) {
if (COUNT.equals(op)) {
single[i] = count(single[i], row[i]);
} else if (MAX.equals(op)) {
single[i] = max(single[i], row[i]);
} else if (MIN.equals(op)) {
single[i] = min(single[i], row[i]);
} else if (SUM.equals(op)) {
single[i] = sum(single[i], row[i]);
} else {
throw new UnsupportedOperationException
(_loc.get("aggregate-unsupported", op).toString());
}
} else {
single[i] = row[i];
}
}
}
_single = single;
return true;
}
Object count(Object current, Object other) {
if (current == null)
return other;
return ((Number)current).longValue() + ((Number)other).longValue();
}
Object max(Object current, Object other) {
if (current == null)
return other;
return Math.max(((Number)current).doubleValue(),
((Number)other).doubleValue());
}
Object min(Object current, Object other) {
if (current == null)
return other;
return Math.min(((Number)current).doubleValue(),
((Number)other).doubleValue());
}
Object sum(Object current, Object other) {
if (current == null)
return other;
return (((Number)current).doubleValue() +
((Number)other).doubleValue());
}
public void open() throws Exception {
for (ResultObjectProvider rop:_rops)
rop.open();
_opened = true;
}
public void reset() throws Exception {
_single = null;
for (ResultObjectProvider rop : _rops) {
rop.reset();
}
}
public int size() throws Exception {
return 1;
}
public boolean supportsRandomAccess() {
return false;
}
}

View File

@ -0,0 +1,11 @@
<HTML>
<BODY>
Implements Distributed version of JDBCStoreManager and JDBCStoreQuery.
This package contains implementaions of OpenJPA interfaces using a distribution
template pattern. Distribution template pattern for <code>T</code> is
defined as a type <code>T'</code>
such that <code>T' extends T implements Iterable<T></code> i.e.<code>T'</code>
is a special <code>T</code> that also contains many <code>T</code>.
</BODY>
</HTML>

View File

@ -0,0 +1,8 @@
<HTML>
<BODY>
Extended OpenJPA Interfaces for distributed databases.
This package contains interface definitions for distribution policy and
distributed configuration.
</BODY>
</HTML>

View File

@ -0,0 +1,75 @@
package org.apache.openjpa.slice.transaction;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.RollbackException;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import org.apache.openjpa.slice.jdbc.SliceStoreManager;
public class DistributedNaiveTransaction implements Transaction {
private Set<SliceStoreManager> _slices = new HashSet<SliceStoreManager>();
private Set<Synchronization> _syncs = new HashSet<Synchronization>();
private final TransactionManager _tm;
private int _status;
private boolean _rollbackOnly;
DistributedNaiveTransaction(TransactionManager tm) {
_tm = tm;
}
public void commit() throws HeuristicMixedException,
HeuristicRollbackException, RollbackException, SecurityException,
SystemException {
throw new UnsupportedOperationException();
}
public boolean delistResource(XAResource arg0, int arg1)
throws IllegalStateException, SystemException {
return _slices.remove(arg0);
}
public boolean enlistResource(XAResource arg0)
throws IllegalStateException, RollbackException, SystemException {
throw new UnsupportedOperationException();
}
public boolean enlistResource(SliceStoreManager arg0)
throws IllegalStateException, RollbackException, SystemException {
return _slices.add(arg0);
}
public int getStatus() throws SystemException {
return _status;
}
public void registerSynchronization(Synchronization arg0)
throws IllegalStateException, RollbackException, SystemException {
_syncs.add(arg0);
}
public void rollback() throws IllegalStateException, SystemException {
_tm.rollback();
}
public void setRollbackOnly() throws IllegalStateException, SystemException {
_rollbackOnly = true;
}
public boolean isRollbackOnly() {
return _rollbackOnly;
}
Set<SliceStoreManager> getEnlistedResources() {
return Collections.unmodifiableSet(_slices);
}
}

View File

@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice.transaction;
import static javax.transaction.xa.XAResource.TMJOIN;
import static javax.transaction.xa.XAResource.TMNOFLAGS;
import static javax.transaction.xa.XAResource.TMSUCCESS;
import java.util.Set;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.InvalidTransactionException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import org.apache.openjpa.lib.util.Localizer;
/**
* A simple (naive?) implementation for distributed transaction across
* XA-complaint data sources.
* Assumes begin() and commit() being called on the same thread.
*
* @author Pinaki Poddar
*
*/
public class DistributedTransactionManager implements TransactionManager {
private final ThreadLocal<DistributedXATransaction> txns =
new ThreadLocal<DistributedXATransaction>();
private static final Localizer _loc =
Localizer.forPackage(DistributedTransactionManager.class);
public void begin() throws NotSupportedException, SystemException {
DistributedXATransaction txn = getTransaction(false);
int i = 1;
Set<XAResource> resources = txn.getEnlistedResources();
for (XAResource resource : resources) {
try {
XAResource existing = isSame(resource, resources);
XID branch = txn.getXID().branch(i++);
int flag = (existing == null) ? TMNOFLAGS : TMJOIN;
resource.start(branch, flag);
} catch (Exception e) {
throw new SystemException(e.toString());
}
}
}
public void commit() throws HeuristicMixedException,
HeuristicRollbackException, IllegalStateException,
RollbackException, SecurityException, SystemException {
DistributedXATransaction txn = getTransaction(true);
Set<XAResource> resources = txn.getEnlistedResources();
int branchId = 1;
boolean nextPhase = true;
for (XAResource resource : resources) {
XID branch = txn.getXID().branch(branchId++);
try {
resource.end(branch, TMSUCCESS);
resource.prepare(branch);
} catch (XAException e) {
nextPhase = false;
}
}
branchId = 1; // reset
if (!nextPhase) {
for (XAResource resource : resources) {
try {
XID branch = txn.getXID().branch(branchId++);
resource.forget(branch);
} catch (XAException e) {
// ignore
}
throw new SystemException(_loc.get("prepare-failed")
.getMessage());
}
}
branchId = 1; // reset
for (XAResource resource : resources) {
XID branch = txn.getXID().branch(branchId++);
try {
resource.commit(branch, false);
} catch (XAException e) {
throw new SystemException(e.getMessage());
}
}
}
public int getStatus() throws SystemException {
return getTransaction().getStatus();
}
public Transaction getTransaction() throws SystemException {
return getTransaction(false);
}
public void resume(Transaction arg0) throws IllegalStateException,
InvalidTransactionException, SystemException {
throw new UnsupportedOperationException();
}
public void rollback() throws IllegalStateException, SecurityException,
SystemException {
DistributedXATransaction txn = getTransaction(true);
Set<XAResource> slices = txn.getEnlistedResources();
int branchId = 1;
for (XAResource slice : slices) {
XID branch = txn.getXID().branch(branchId++);
try {
slice.end(branch, XAResource.TMFAIL);
slice.rollback(branch);
} catch (XAException e) {
}
}
}
public void setRollbackOnly() throws IllegalStateException, SystemException {
getTransaction().setRollbackOnly();
}
public void setTransactionTimeout(int arg0) throws SystemException {
throw new UnsupportedOperationException();
}
public Transaction suspend() throws SystemException {
throw new UnsupportedOperationException();
}
XAResource isSame(XAResource rm, Set<XAResource> others) {
for (XAResource other : others)
try {
if (rm != other && other.isSameRM(rm))
return other;
} catch (XAException e) {
e.printStackTrace();
}
return null;
}
String toString(Object o) {
return o.getClass().getSimpleName() + "@"
+ Long.toHexString(System.identityHashCode(o));
}
/**
* Gets the transaction associated with the current thread.
*
* @param mustExist if true, a transaction must be associated with the
* current thread a priori. If false, the current thread has no associated
* transaction, a new transaction is created with a global identifier
* and associated with the current thread.
*/
DistributedXATransaction getTransaction(boolean mustExist) {
DistributedXATransaction txn = txns.get();
if (txn == null) {
if (mustExist)
throw new IllegalStateException(_loc.get("no-txn-on-thread",
Thread.currentThread().getName()).getMessage());
byte[] global =
Long.toHexString(System.currentTimeMillis()).getBytes();
XID xid = new XID(0, global, new byte[] { 0x1 });
txn = new DistributedXATransaction(xid, this);
txns.set(txn);
}
return txn;
}
}

View File

@ -0,0 +1,82 @@
package org.apache.openjpa.slice.transaction;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.RollbackException;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
/**
* Internal implementation of a Transaction with an internal global transaction
* identifier scheme.
*
* @author Pinaki Poddar
*
*/
class DistributedXATransaction implements Transaction {
private static ThreadLocal<Transaction> _trans = new ThreadLocal<Transaction>();
private Set<XAResource> _slices = new HashSet<XAResource>();
private Set<Synchronization> _syncs = new HashSet<Synchronization>();
private final TransactionManager _tm;
private final XID xid;
private int _status;
private boolean _rollbackOnly;
/**
* Construct with
* @param xid
* @param tm
*/
DistributedXATransaction(XID xid, TransactionManager tm) {
this.xid = xid;
this._tm = tm;
}
public XID getXID() {
return xid;
}
public void commit() throws HeuristicMixedException,
HeuristicRollbackException, RollbackException, SecurityException,
SystemException {
_tm.commit();
}
public boolean delistResource(XAResource arg0, int arg1)
throws IllegalStateException, SystemException {
return _slices.remove(arg0);
}
public boolean enlistResource(XAResource arg0)
throws IllegalStateException, RollbackException, SystemException {
return _slices.add(arg0);
}
public int getStatus() throws SystemException {
return _status;
}
public void registerSynchronization(Synchronization arg0)
throws IllegalStateException, RollbackException, SystemException {
_syncs.add(arg0);
}
public void rollback() throws IllegalStateException, SystemException {
_tm.rollback();
}
public void setRollbackOnly() throws IllegalStateException, SystemException {
_rollbackOnly = true;
}
Set<XAResource> getEnlistedResources() {
return Collections.unmodifiableSet(_slices);
}
}

View File

@ -0,0 +1,96 @@
package org.apache.openjpa.slice.transaction;
import java.util.Set;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.InvalidTransactionException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.apache.openjpa.lib.util.Localizer;
import org.apache.openjpa.slice.jdbc.SliceStoreManager;
/**
* A fake transaction manager which runs a serial commit or rollback across
* the enlisted non-XA resources without any two-phase commit protocol.
*
* @author Pinaki Poddar
*
*/
public class NaiveTransactionManager implements TransactionManager {
private final ThreadLocal<DistributedNaiveTransaction> _txns =
new ThreadLocal<DistributedNaiveTransaction>();
private static final Localizer _loc =
Localizer.forPackage(NaiveTransactionManager.class);
public void begin() throws NotSupportedException, SystemException {
DistributedNaiveTransaction txn = getTransaction(false);
Set<SliceStoreManager> slices = txn.getEnlistedResources();
for (SliceStoreManager slice : slices) {
slice.getConnection();
slice.begin();
}
}
public void commit() throws HeuristicMixedException,
HeuristicRollbackException, IllegalStateException,
RollbackException, SecurityException, SystemException {
DistributedNaiveTransaction txn = getTransaction(false);
Set<SliceStoreManager> slices = txn.getEnlistedResources();
for (SliceStoreManager slice : slices) {
slice.commit();
}
}
public int getStatus() throws SystemException {
return getTransaction().getStatus();
}
public Transaction getTransaction() throws SystemException {
return getTransaction(false);
}
public void resume(Transaction arg0) throws IllegalStateException,
InvalidTransactionException, SystemException {
throw new UnsupportedOperationException();
}
public void rollback() throws IllegalStateException, SecurityException,
SystemException {
DistributedNaiveTransaction txn = getTransaction(false);
Set<SliceStoreManager> slices = txn.getEnlistedResources();
for (SliceStoreManager slice : slices) {
slice.commit();
}
}
public void setRollbackOnly() throws IllegalStateException, SystemException {
getTransaction().setRollbackOnly();
}
public void setTransactionTimeout(int arg0) throws SystemException {
throw new UnsupportedOperationException();
}
public Transaction suspend() throws SystemException {
throw new UnsupportedOperationException();
}
DistributedNaiveTransaction getTransaction(boolean mustExist) {
DistributedNaiveTransaction txn = _txns.get();
if (txn == null) {
if (mustExist)
throw new IllegalStateException(_loc.get("no-txn-on-thread",
Thread.currentThread().getName()).getMessage());
txn = new DistributedNaiveTransaction(this);
_txns.set(txn);
}
return txn;
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice.transaction;
import javax.transaction.xa.Xid;
/**
* Internally used Global Transaction Identifier for two-phase distributed
* commit protocol.
*
* @author Pinaki Poddar
*
*/
class XID implements Xid {
private final int format;
private final byte[] global;
private final byte[] branch;
public XID(int format, byte[] global, byte[] branch) {
super();
this.format = format;
this.global = global;
this.branch = branch;
}
public byte[] getBranchQualifier() {
return branch;
}
public int getFormatId() {
return format;
}
public byte[] getGlobalTransactionId() {
return global;
}
XID branch(Number number) {
return branch((number == null) ? "null" : number.toString());
}
XID branch(String branch) {
return new XID(format, global, branch.getBytes());
}
public String toString() {
return new String(global) + ":" + new String(branch);
}
public boolean equals(Object other) {
if (other instanceof XID) {
XID that = (XID) other;
return format == that.format && equals(global, that.global)
&& equals(branch, that.branch);
}
return false;
}
boolean equals(byte[] a, byte[] b) {
if (a == null && b == null)
return true;
if (a == null || b == null)
return false;
return new String(a).equals(new String(b));
}
}

View File

@ -0,0 +1,10 @@
<HTML>
<BODY>
Implements TransactionManager to manage transactions across the database
slices. This package provides two flavors of TransactionManager: one provides
two-phase commit protocol when all underlying database slices is XA-complaint;
while the other runs commit/rollback simply looping across all database slices
when one or more underlying database slices is not XA-complaint and hence does
not guarantee atomic nature of transaction.
</BODY>
</HTML>

View File

@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
org.apache.openjpa.slice.ProductDerivation

View File

@ -0,0 +1,49 @@
slice-not-found: No slice named "{0}" can be found. Available slices are "{1}"
slice-no-url: Slice "{0}" has no database URL. Specify a valid database URL \
as the value of "slice.{0}.ConnectionURL" property. ConnectionURL is the \
only mandatory property of a slice. If any other slice property \
is not specified, then the corresponding openjpa.* property value is used \
as default.
slice-none-configured: No slice has been configured. Specify slice.XYZ as \
property name to register a slice named XYZ.
slice-configuration: Slice "{0}" configured with "{1}"
slice-available: Detected slices "{0}" in configuration.
no-url: No ConnectionURL property has been specified.
wrong-url: URL "{0}" is invalid as database URL.
wrong-slice: Wrong slice "{0}" for "{1}"
slice-connect: Connecting to slice "{0}" at URL "{1}"
slice-connect-warn: Failed to connect to slice "{0}". Slice "{0}" will be \
ignored as configuration is set as lenient.
slice-connect-known-warn: Failed to connect to due to "{2}. \
Slice "{0}" will be ignored as configuration is set as lenient.
slice-connect-error: Failed to connect to URL "{1}"
slice-connect-known-error: Failed to connect to URL "{1} due to {2}
bad-policy-slice:Distribution policy "{0}" has returned invalid slice \
"{1}" for "{2}". The valid slices are {3}. This error may happen \
when one or more of the originally configured slices are unavailable \
and Lenient property is set to true.
slice-xa-enabled: All slices "{0}" is XA-complaint and hence store transaction \
will use a two-phase commit protocol even if the persistent unit is \
configured for non-JTA transaction.
slice-xa-disabled: Not all active slices "{0}" is XA-complaint and hence store \
transaction will not use a two-phase commit protocol. If persistent unit \
is configured for JTA transaction then the slices will participate in \
global transaction but otherwise the atomic nature of commit across all \
slices is not guaranteed.
two-phase: "{3}".{0}"(xid=[{4}]] Connection={1} XAConnection={2}
factory-init: Starting {0}
config-init: Configuring Slice {0}
no-slice-names: Slice identifiers are not listed in [slice.Names] property. \
The configuration will be scanned to determine slice identifiers.
no-master-slice: No master slice has been configured explicitly in \
[slice.Master] property. The first slice "{0}" in the list of configured \
slices will be used as master.
resource-xa-tm-not-2pc: All slices is using XA-complaint driver but the \
configured "{0}" transaction manager is not capable of enlisting XA-aware \
resources. See slice.TransactionManager property documentation to \
configure XA-aware Transaction Manager capable for two-phase commit.
resource-not-xa-tm-2pc: One or more slices is not using XA-complaint driver \
but the configured "{0}" transaction manager can only enlist XA-aware \
resource.
not-open: The underlying result sets are not open.
aggregate-unsupported: The query uses unsupported aggregate operation "{0}".

View File

@ -0,0 +1,4 @@
bad-policy-slice:Distribution policy "{0}" has returned invalid slice \
"{1}" for "{2}". The valid slices are {3}. This error may happen \
when one or more of the originally configured slices are unavailable \
and Lenient property is set to true.

View File

@ -0,0 +1,3 @@
no-txn-on-thread: No transaction is associated with current thread "{0}"
prepare-failed: one or more XA-complaint resources have failed to prepare for \
commit during the first phase of a two-phase commit protocol.

View File

@ -84,7 +84,8 @@
<module>openjpa-lib</module>
<module>openjpa-kernel</module>
<module>openjpa-jdbc</module>
<module>openjpa-xmlstore</module>
<module>openjpa-xmlstore</module>
<module>openjpa-slice</module>
<module>openjpa-all</module>
<module>openjpa-project</module>
<module>openjpa-integration</module>
@ -100,7 +101,8 @@
<module>openjpa-persistence</module>
<module>openjpa-persistence-jdbc</module>
<module>openjpa-kernel-5</module>
<module>openjpa-jdbc-5</module>
<module>openjpa-jdbc-5</module>
<module>openjpa-slice</module>
<module>openjpa-examples</module>
</modules>
</profile>