diff --git a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java index 269779c6e..15475a5a7 100644 --- a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java +++ b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java @@ -584,7 +584,12 @@ public class JDBCStoreQuery } } } finally { - try { conn.close(); } catch (SQLException se) {} + try { + if (conn.getAutoCommit()) + conn.close(); + } catch (SQLException se) { + + } } localContext.remove(); diff --git a/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java b/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java index 5794d3a5d..650e04bab 100644 --- a/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java +++ b/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java @@ -1829,6 +1829,25 @@ public class BrokerImpl implements Broker, FindCallbacks, Cloneable, Serializabl endOperation(); } } + + /** + * Sets the given flag to the status. + * + * @since 2.3.0 + */ + protected void setStatusFlag(int flag) { + _flags |= flag; + } + + /** + * Clears the given flag from the status. + * + * @since 2.3.0 + */ + protected void clearStatusFlag(int flag) { + _flags &= ~flag; + } + public void flush() { beginOperation(true); diff --git a/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java b/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java index 200186946..8de7dc64c 100644 --- a/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java +++ b/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java @@ -347,6 +347,11 @@ public class QueryImpl extends AbstractQuery implements Serializable { private boolean pushQueryFetchPlan() { boolean fcPushed = false; + if (_hintHandler != null) { + FetchConfiguration fc = _fetch == null ? null : ((FetchPlanImpl)_fetch).getDelegate(); + _em.pushFetchPlan(fc); + return true; + } if (_fetch != null && _hintHandler != null) { switch (_fetch.getReadLockMode()) { case PESSIMISTIC_READ: @@ -528,6 +533,7 @@ public class QueryImpl extends AbstractQuery implements Serializable { * cache. */ private boolean preExecute(Map params) { + PreparedQueryCache cache = _em.getPreparedQueryCache(); if (cache == null) { return false; diff --git a/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java b/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java index 19e6dd23e..30235ca52 100644 --- a/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java +++ b/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java @@ -152,6 +152,12 @@ public class DistributedBrokerImpl extends FinalizingBrokerImpl implements Distr @Override public void beginStore() { } + + @Override + protected void flush(int reason) { + setStatusFlag(2 << 8); + super.flush(reason); + } /** * Overrides to target specific slices for find() calls. diff --git a/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java b/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java index 605fab229..ff7e0222a 100644 --- a/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java +++ b/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java @@ -293,6 +293,20 @@ class DistributedJDBCStoreManager extends JDBCStoreManager } } + @Override + public void commit() { + for (SliceStoreManager slice : _slices) { + slice.commit(); + } + } + + @Override + public void rollback() { + for (SliceStoreManager slice : _slices) { + slice.rollback(); + } + } + /** * Collect the current versions of the given StateManagers. */ diff --git a/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java b/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java index d27b8d328..6a0462e56 100644 --- a/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java +++ b/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import org.apache.openjpa.jdbc.kernel.JDBCStore; import org.apache.openjpa.jdbc.kernel.JDBCStoreQuery; +import org.apache.openjpa.kernel.Broker; import org.apache.openjpa.kernel.BrokerImpl; import org.apache.openjpa.kernel.ExpressionStoreQuery; import org.apache.openjpa.kernel.FetchConfiguration; @@ -118,15 +119,14 @@ class DistributedStoreQuery extends JDBCStoreQuery { */ public ResultObjectProvider executeQuery(StoreQuery q, final Object[] params, final Range range) { - List> futures = - new ArrayList>(); + List> futures = new ArrayList>(); final List usedExecutors = new ArrayList(); - final List rops = - new ArrayList(); + final List rops = new ArrayList(); List targets = findTargets(); QueryContext ctx = q.getContext(); boolean isReplicated = containsReplicated(ctx); ExecutorService threadPool = SliceThread.getPool(); + for (int i = 0; i < owner._queries.size(); i++) { // if replicated, then execute only on single slice if (isReplicated && !usedExecutors.isEmpty()) { @@ -135,16 +135,12 @@ class DistributedStoreQuery extends JDBCStoreQuery { StoreManager sm = owner.getDistributedStore().getSlice(i); if (!targets.contains(sm)) continue; - StoreQuery query = owner._queries.get(i); - Executor executor = executors.get(i); - if (!targets.contains(sm)) - continue; - usedExecutors.add(executor); QueryExecutor call = new QueryExecutor(); - call.executor = executor; - call.query = query; + call.executor = executors.get(i); + call.query = owner._queries.get(i); call.params = params; call.range = range; + usedExecutors.add(call.executor); futures.add(threadPool.submit(call)); } for (Future future : futures) { @@ -157,16 +153,14 @@ class DistributedStoreQuery extends JDBCStoreQuery { } } - ResultObjectProvider[] tmp = rops - .toArray(new ResultObjectProvider[rops.size()]); + ResultObjectProvider[] tmp = rops.toArray(new ResultObjectProvider[rops.size()]); ResultObjectProvider result = null; boolean[] ascending = getAscending(q); boolean isAscending = ascending.length > 0; boolean isAggregate = ctx.isAggregate(); boolean hasRange = ctx.getEndRange() != Long.MAX_VALUE; if (isAggregate) { - result = new UniqueResultObjectProvider(tmp, q, - getQueryExpressions()); + result = new UniqueResultObjectProvider(tmp, q, getQueryExpressions()); } else if (isAscending) { result = new OrderingMergedResultObjectProvider(tmp, ascending, usedExecutors.toArray(new Executor[usedExecutors.size()]), @@ -175,8 +169,7 @@ class DistributedStoreQuery extends JDBCStoreQuery { result = new MergedResultObjectProvider(tmp); } if (hasRange) { - result = new RangeResultObjectProvider(result, - ctx.getStartRange(), ctx.getEndRange()); + result = new RangeResultObjectProvider(result, ctx.getStartRange(), ctx.getEndRange()); } return result; } @@ -201,16 +194,18 @@ class DistributedStoreQuery extends JDBCStoreQuery { } public Number executeDelete(StoreQuery q, Object[] params) { - Iterator qs = owner._queries.iterator(); - List> futures = null; + List> futures = new ArrayList>(); int result = 0; ExecutorService threadPool = SliceThread.getPool(); - for (Executor ex : executors) { - if (futures == null) - futures = new ArrayList>(); + List targets = findTargets(); + for (int i = 0; i < owner._queries.size(); i++) { + StoreManager sm = owner.getDistributedStore().getSlice(i); + if (!targets.contains(sm)) + continue; + DeleteExecutor call = new DeleteExecutor(); - call.executor = ex; - call.query = qs.next(); + call.executor = executors.get(i); + call.query = owner._queries.get(i); call.params = params; futures.add(threadPool.submit(call)); } @@ -256,8 +251,7 @@ class DistributedStoreQuery extends JDBCStoreQuery { } List findTargets() { - FetchConfiguration fetch = owner.getContext() - .getFetchConfiguration(); + FetchConfiguration fetch = owner.getContext().getFetchConfiguration(); return owner.getDistributedStore().getTargets(fetch); } diff --git a/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBulkDelete.java b/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBulkDelete.java new file mode 100644 index 000000000..6f9359611 --- /dev/null +++ b/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBulkDelete.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.openjpa.slice; + +import java.util.List; + +import javax.persistence.EntityManager; +import javax.persistence.EntityManagerFactory; +import javax.persistence.Persistence; + + +import org.apache.openjpa.kernel.BrokerFactory; +import org.apache.openjpa.persistence.JPAFacadeHelper; +import org.apache.openjpa.slice.DistributedBrokerFactory; +import org.apache.openjpa.slice.SlicePersistence; +import org.apache.openjpa.slice.policy.UniformDistributionPolicy; + +/** + * Tests delete-by-query. + * + * @author Pinaki Poddar + * + */ +public class TestBulkDelete extends SliceTestCase { + private static int SLICES = 3; + private static List SLICE_NAMES; + + @Override + protected String getPersistenceUnitName() { + return "slice"; + } + public void setUp() throws Exception { + super.setUp(PObject.class, CLEAR_TABLES, + "openjpa.slice.DistributionPolicy", UniformDistributionPolicy.class.getName()); + + } + + public void tearDown() throws Exception { + System.err.println("Delete all instances from all slices"); + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + String delete = "delete from PObject p"; + int m = em.createQuery(delete).executeUpdate(); + em.getTransaction().commit(); + super.tearDown(); + } + + /** + * Creates N instances that are distributed in 3 slices. + * Deletes all instances from only one slice. + */ + public void testBulkDelete() { + EntityManager em = emf.createEntityManager(); + em.getTransaction().begin(); + DistributedConfiguration conf = (DistributedConfiguration)emf.getConfiguration(); + SLICE_NAMES = conf.getActiveSliceNames(); + SLICES = SLICE_NAMES.size(); + assertTrue(SLICES > 1); + int M = 4; // no of instances in each slice + int N = SLICES*M; // total number of instances in all 3 slices + + for (int i = 0; i < N; i++) { + PObject pc = new PObject(); + em.persist(pc); + } + em.getTransaction().commit(); + String jpql = "select count(p) from PObject p"; + long total = em.createQuery(jpql, Long.class).getSingleResult(); + assertEquals(N, total); + + for (int i = 0; i < SLICES; i++) { + System.err.println("Query only on slice [" + SLICE_NAMES.get(i) + "]"); + long count = em.createQuery(jpql,Long.class) + .setHint(SlicePersistence.HINT_TARGET, SLICE_NAMES.get(i)) + .getSingleResult(); + assertEquals(M, count); + } + + em.getTransaction().begin(); + System.err.println("Delete only from slice [" + SLICE_NAMES.get(0) + "]"); + String delete = "delete from PObject p"; + int m = em.createQuery(delete) + .setHint(SlicePersistence.HINT_TARGET, SLICE_NAMES.get(0)) + .executeUpdate(); + assertEquals(M, m); + em.getTransaction().commit(); + } +} diff --git a/openjpa-slice/src/test/java/org/apache/openjpa/slice/policy/UniformDistributionPolicy.java b/openjpa-slice/src/test/java/org/apache/openjpa/slice/policy/UniformDistributionPolicy.java new file mode 100644 index 000000000..5034fc4c9 --- /dev/null +++ b/openjpa-slice/src/test/java/org/apache/openjpa/slice/policy/UniformDistributionPolicy.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.openjpa.slice.policy; + +import java.util.List; + +import org.apache.openjpa.slice.DistributionPolicy; +import org.apache.openjpa.slice.PObject; + + +/** + * Distributes the instances uniformly among the available slices + * based on the integral value of the persistence identifier. + *
+ * Given {@code M} slices and {@code N} instances whose identity + * value is uniformly distributed, this policy will persist these + * instances such that + *
  • each slice will have N/M instances + *
  • the identity of the instances in the {@code i}-th slice + * will be divisible by {@code i}. + * + * @author Pinaki Poddar + * + */ +public class UniformDistributionPolicy implements DistributionPolicy { + + @Override + public String distribute(Object pc, List slices, Object context) { + int N = slices.size(); + for (int i = N; i > 0; i--) { + PObject p = (PObject)pc; + if (p.getId()%i == 0) return slices.get(i-1); + } + return slices.get(0); + } + +} diff --git a/scripts/test.bat b/scripts/test.bat index 6aa540192..6baf634ec 100644 --- a/scripts/test.bat +++ b/scripts/test.bat @@ -18,7 +18,6 @@ @rem @setlocal -pushd openjpa-persistence-jdbc -mvn test -Dtest=%1 %2 %3 %4 +mvn test -DfailIfNoTests=false -Dbuild.enhance=false -Dtest=%1 %2 %3 %4 popd @endlocal