OPENJPA-2365: Support for delete-by-query. Correction for hint processing

git-svn-id: https://svn.apache.org/repos/asf/openjpa/trunk@1464082 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Pinaki Poddar 2013-04-03 15:51:52 +00:00
parent 7f14c7df6b
commit 3d11895269
9 changed files with 229 additions and 29 deletions

View File

@ -584,7 +584,12 @@ public class JDBCStoreQuery
}
}
} finally {
try { conn.close(); } catch (SQLException se) {}
try {
if (conn.getAutoCommit())
conn.close();
} catch (SQLException se) {
}
}
localContext.remove();

View File

@ -1829,6 +1829,25 @@ public class BrokerImpl implements Broker, FindCallbacks, Cloneable, Serializabl
endOperation();
}
}
/**
* Sets the given flag to the status.
*
* @since 2.3.0
*/
protected void setStatusFlag(int flag) {
_flags |= flag;
}
/**
* Clears the given flag from the status.
*
* @since 2.3.0
*/
protected void clearStatusFlag(int flag) {
_flags &= ~flag;
}
public void flush() {
beginOperation(true);

View File

@ -347,6 +347,11 @@ public class QueryImpl<X> extends AbstractQuery<X> implements Serializable {
private boolean pushQueryFetchPlan() {
boolean fcPushed = false;
if (_hintHandler != null) {
FetchConfiguration fc = _fetch == null ? null : ((FetchPlanImpl)_fetch).getDelegate();
_em.pushFetchPlan(fc);
return true;
}
if (_fetch != null && _hintHandler != null) {
switch (_fetch.getReadLockMode()) {
case PESSIMISTIC_READ:
@ -528,6 +533,7 @@ public class QueryImpl<X> extends AbstractQuery<X> implements Serializable {
* cache.
*/
private boolean preExecute(Map params) {
PreparedQueryCache cache = _em.getPreparedQueryCache();
if (cache == null) {
return false;

View File

@ -152,6 +152,12 @@ public class DistributedBrokerImpl extends FinalizingBrokerImpl implements Distr
@Override
public void beginStore() {
}
@Override
protected void flush(int reason) {
setStatusFlag(2 << 8);
super.flush(reason);
}
/**
* Overrides to target specific slices for find() calls.

View File

@ -293,6 +293,20 @@ class DistributedJDBCStoreManager extends JDBCStoreManager
}
}
@Override
public void commit() {
for (SliceStoreManager slice : _slices) {
slice.commit();
}
}
@Override
public void rollback() {
for (SliceStoreManager slice : _slices) {
slice.rollback();
}
}
/**
* Collect the current versions of the given StateManagers.
*/

View File

@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.openjpa.jdbc.kernel.JDBCStore;
import org.apache.openjpa.jdbc.kernel.JDBCStoreQuery;
import org.apache.openjpa.kernel.Broker;
import org.apache.openjpa.kernel.BrokerImpl;
import org.apache.openjpa.kernel.ExpressionStoreQuery;
import org.apache.openjpa.kernel.FetchConfiguration;
@ -118,15 +119,14 @@ class DistributedStoreQuery extends JDBCStoreQuery {
*/
public ResultObjectProvider executeQuery(StoreQuery q,
final Object[] params, final Range range) {
List<Future<ResultObjectProvider>> futures =
new ArrayList<Future<ResultObjectProvider>>();
List<Future<ResultObjectProvider>> futures = new ArrayList<Future<ResultObjectProvider>>();
final List<Executor> usedExecutors = new ArrayList<Executor>();
final List<ResultObjectProvider> rops =
new ArrayList<ResultObjectProvider>();
final List<ResultObjectProvider> rops = new ArrayList<ResultObjectProvider>();
List<SliceStoreManager> targets = findTargets();
QueryContext ctx = q.getContext();
boolean isReplicated = containsReplicated(ctx);
ExecutorService threadPool = SliceThread.getPool();
for (int i = 0; i < owner._queries.size(); i++) {
// if replicated, then execute only on single slice
if (isReplicated && !usedExecutors.isEmpty()) {
@ -135,16 +135,12 @@ class DistributedStoreQuery extends JDBCStoreQuery {
StoreManager sm = owner.getDistributedStore().getSlice(i);
if (!targets.contains(sm))
continue;
StoreQuery query = owner._queries.get(i);
Executor executor = executors.get(i);
if (!targets.contains(sm))
continue;
usedExecutors.add(executor);
QueryExecutor call = new QueryExecutor();
call.executor = executor;
call.query = query;
call.executor = executors.get(i);
call.query = owner._queries.get(i);
call.params = params;
call.range = range;
usedExecutors.add(call.executor);
futures.add(threadPool.submit(call));
}
for (Future<ResultObjectProvider> future : futures) {
@ -157,16 +153,14 @@ class DistributedStoreQuery extends JDBCStoreQuery {
}
}
ResultObjectProvider[] tmp = rops
.toArray(new ResultObjectProvider[rops.size()]);
ResultObjectProvider[] tmp = rops.toArray(new ResultObjectProvider[rops.size()]);
ResultObjectProvider result = null;
boolean[] ascending = getAscending(q);
boolean isAscending = ascending.length > 0;
boolean isAggregate = ctx.isAggregate();
boolean hasRange = ctx.getEndRange() != Long.MAX_VALUE;
if (isAggregate) {
result = new UniqueResultObjectProvider(tmp, q,
getQueryExpressions());
result = new UniqueResultObjectProvider(tmp, q, getQueryExpressions());
} else if (isAscending) {
result = new OrderingMergedResultObjectProvider(tmp, ascending,
usedExecutors.toArray(new Executor[usedExecutors.size()]),
@ -175,8 +169,7 @@ class DistributedStoreQuery extends JDBCStoreQuery {
result = new MergedResultObjectProvider(tmp);
}
if (hasRange) {
result = new RangeResultObjectProvider(result,
ctx.getStartRange(), ctx.getEndRange());
result = new RangeResultObjectProvider(result, ctx.getStartRange(), ctx.getEndRange());
}
return result;
}
@ -201,16 +194,18 @@ class DistributedStoreQuery extends JDBCStoreQuery {
}
public Number executeDelete(StoreQuery q, Object[] params) {
Iterator<StoreQuery> qs = owner._queries.iterator();
List<Future<Number>> futures = null;
List<Future<Number>> futures = new ArrayList<Future<Number>>();
int result = 0;
ExecutorService threadPool = SliceThread.getPool();
for (Executor ex : executors) {
if (futures == null)
futures = new ArrayList<Future<Number>>();
List<SliceStoreManager> targets = findTargets();
for (int i = 0; i < owner._queries.size(); i++) {
StoreManager sm = owner.getDistributedStore().getSlice(i);
if (!targets.contains(sm))
continue;
DeleteExecutor call = new DeleteExecutor();
call.executor = ex;
call.query = qs.next();
call.executor = executors.get(i);
call.query = owner._queries.get(i);
call.params = params;
futures.add(threadPool.submit(call));
}
@ -256,8 +251,7 @@ class DistributedStoreQuery extends JDBCStoreQuery {
}
List<SliceStoreManager> findTargets() {
FetchConfiguration fetch = owner.getContext()
.getFetchConfiguration();
FetchConfiguration fetch = owner.getContext().getFetchConfiguration();
return owner.getDistributedStore().getTargets(fetch);
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice;
import java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
import org.apache.openjpa.kernel.BrokerFactory;
import org.apache.openjpa.persistence.JPAFacadeHelper;
import org.apache.openjpa.slice.DistributedBrokerFactory;
import org.apache.openjpa.slice.SlicePersistence;
import org.apache.openjpa.slice.policy.UniformDistributionPolicy;
/**
* Tests delete-by-query.
*
* @author Pinaki Poddar
*
*/
public class TestBulkDelete extends SliceTestCase {
private static int SLICES = 3;
private static List<String> SLICE_NAMES;
@Override
protected String getPersistenceUnitName() {
return "slice";
}
public void setUp() throws Exception {
super.setUp(PObject.class, CLEAR_TABLES,
"openjpa.slice.DistributionPolicy", UniformDistributionPolicy.class.getName());
}
public void tearDown() throws Exception {
System.err.println("Delete all instances from all slices");
EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
String delete = "delete from PObject p";
int m = em.createQuery(delete).executeUpdate();
em.getTransaction().commit();
super.tearDown();
}
/**
* Creates N instances that are distributed in 3 slices.
* Deletes all instances from only one slice.
*/
public void testBulkDelete() {
EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
DistributedConfiguration conf = (DistributedConfiguration)emf.getConfiguration();
SLICE_NAMES = conf.getActiveSliceNames();
SLICES = SLICE_NAMES.size();
assertTrue(SLICES > 1);
int M = 4; // no of instances in each slice
int N = SLICES*M; // total number of instances in all 3 slices
for (int i = 0; i < N; i++) {
PObject pc = new PObject();
em.persist(pc);
}
em.getTransaction().commit();
String jpql = "select count(p) from PObject p";
long total = em.createQuery(jpql, Long.class).getSingleResult();
assertEquals(N, total);
for (int i = 0; i < SLICES; i++) {
System.err.println("Query only on slice [" + SLICE_NAMES.get(i) + "]");
long count = em.createQuery(jpql,Long.class)
.setHint(SlicePersistence.HINT_TARGET, SLICE_NAMES.get(i))
.getSingleResult();
assertEquals(M, count);
}
em.getTransaction().begin();
System.err.println("Delete only from slice [" + SLICE_NAMES.get(0) + "]");
String delete = "delete from PObject p";
int m = em.createQuery(delete)
.setHint(SlicePersistence.HINT_TARGET, SLICE_NAMES.get(0))
.executeUpdate();
assertEquals(M, m);
em.getTransaction().commit();
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.openjpa.slice.policy;
import java.util.List;
import org.apache.openjpa.slice.DistributionPolicy;
import org.apache.openjpa.slice.PObject;
/**
* Distributes the instances uniformly among the available slices
* based on the integral value of the persistence identifier.
* <br>
* Given {@code M} slices and {@code N} instances whose identity
* value is uniformly distributed, this policy will persist these
* instances such that
* <LI>each slice will have N/M instances
* <LI>the identity of the instances in the {@code i}-th slice
* will be divisible by {@code i}.
*
* @author Pinaki Poddar
*
*/
public class UniformDistributionPolicy implements DistributionPolicy {
@Override
public String distribute(Object pc, List<String> slices, Object context) {
int N = slices.size();
for (int i = N; i > 0; i--) {
PObject p = (PObject)pc;
if (p.getId()%i == 0) return slices.get(i-1);
}
return slices.get(0);
}
}

View File

@ -18,7 +18,6 @@
@rem
@setlocal
pushd openjpa-persistence-jdbc
mvn test -Dtest=%1 %2 %3 %4
mvn test -DfailIfNoTests=false -Dbuild.enhance=false -Dtest=%1 %2 %3 %4
popd
@endlocal