diff --git a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/SQLStoreQuery.java b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/SQLStoreQuery.java index c19c6b4c3..bdd17e99e 100644 --- a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/SQLStoreQuery.java +++ b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/SQLStoreQuery.java @@ -98,7 +98,7 @@ public class SQLStoreQuery /** * Executes the filter as a SQL query. */ - protected static class SQLExecutor + public static class SQLExecutor extends AbstractExecutor { private final ClassMetaData _meta; diff --git a/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java b/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java index 9392e4313..8cfd19440 100644 --- a/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java +++ b/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java @@ -41,6 +41,7 @@ import org.apache.openjpa.jdbc.kernel.ConnectionInfo; import org.apache.openjpa.jdbc.kernel.JDBCFetchConfigurationImpl; import org.apache.openjpa.jdbc.kernel.JDBCStore; import org.apache.openjpa.jdbc.kernel.JDBCStoreManager; +import org.apache.openjpa.jdbc.kernel.SQLStoreQuery; import org.apache.openjpa.jdbc.sql.Result; import org.apache.openjpa.jdbc.sql.ResultSetResult; import org.apache.openjpa.kernel.FetchConfiguration; @@ -437,7 +438,18 @@ class DistributedJDBCStoreManager extends JDBCStoreManager * Construct a distributed query to be executed against all the slices. */ public StoreQuery newQuery(String language) { + if (QueryLanguages.LANG_SQL.equals(language)) { + DistributedSQLStoreQuery ret = new DistributedSQLStoreQuery(this); + for (SliceStoreManager slice : _slices) { + ret.add(slice.newQuery(language)); + } + return ret; + } ExpressionParser parser = QueryLanguages.parserForLanguage(language); + if (parser == null) { + throw new UnsupportedOperationException("Language [" + language + "] not supported"); + } + DistributedStoreQuery ret = new DistributedStoreQuery(this, parser); for (SliceStoreManager slice : _slices) { ret.add(slice.newQuery(language)); diff --git a/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedSQLStoreQuery.java b/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedSQLStoreQuery.java new file mode 100644 index 000000000..7b2a86b24 --- /dev/null +++ b/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedSQLStoreQuery.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.openjpa.slice.jdbc; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.openjpa.jdbc.kernel.JDBCStore; +import org.apache.openjpa.jdbc.kernel.JDBCStoreQuery; +import org.apache.openjpa.jdbc.kernel.SQLStoreQuery; +import org.apache.openjpa.kernel.AbstractStoreQuery; +import org.apache.openjpa.kernel.BrokerImpl; +import org.apache.openjpa.kernel.ExpressionStoreQuery; +import org.apache.openjpa.kernel.FetchConfiguration; +import org.apache.openjpa.kernel.OrderingMergedResultObjectProvider; +import org.apache.openjpa.kernel.QueryContext; +import org.apache.openjpa.kernel.QueryImpl; +import org.apache.openjpa.kernel.StoreManager; +import org.apache.openjpa.kernel.StoreQuery; +import org.apache.openjpa.kernel.exps.ExpressionParser; +import org.apache.openjpa.lib.rop.MergedResultObjectProvider; +import org.apache.openjpa.lib.rop.RangeResultObjectProvider; +import org.apache.openjpa.lib.rop.ResultObjectProvider; +import org.apache.openjpa.meta.ClassMetaData; +import org.apache.openjpa.slice.DistributedConfiguration; +import org.apache.openjpa.slice.SliceThread; +import org.apache.openjpa.util.StoreException; + +/** + * A query for distributed databases. + * + * @author Pinaki Poddar + * + */ +@SuppressWarnings("serial") +class DistributedSQLStoreQuery extends SQLStoreQuery { + private List _queries = new ArrayList(); + + public DistributedSQLStoreQuery(JDBCStore store) { + super(store); + } + + void add(StoreQuery q) { + _queries.add(q); + } + + public DistributedJDBCStoreManager getDistributedStore() { + return (DistributedJDBCStoreManager) getStore(); + } + + public Executor newDataStoreExecutor(ClassMetaData meta, boolean subs) { + boolean parallel = !getContext().getStoreContext().getBroker() + .getMultithreaded(); + ParallelExecutor ex = new ParallelExecutor(this, meta, parallel); + for (StoreQuery q : _queries) { + ex.addExecutor(q.newDataStoreExecutor(meta, subs)); + } + return ex; + } + + public void setContext(QueryContext ctx) { + super.setContext(ctx); + for (StoreQuery q : _queries) + q.setContext(ctx); + } + + /** + * Executes queries on multiple databases. + * + * @author Pinaki Poddar + * + */ + public static class ParallelExecutor extends + SQLStoreQuery.SQLExecutor { + private List executors = new ArrayList(); + private DistributedSQLStoreQuery owner = null; + + public ParallelExecutor(DistributedSQLStoreQuery dsq, ClassMetaData meta, boolean p) { + super(dsq, meta); + owner = dsq; + } + + public void addExecutor(Executor ex) { + executors.add(ex); + } + + /** + * Each child query must be executed with slice context and not the + * given query context. + */ + public ResultObjectProvider executeQuery(StoreQuery q, + final Object[] params, final Range range) { + List> futures = + new ArrayList>(); + final List usedExecutors = new ArrayList(); + final List rops = + new ArrayList(); + List targets = findTargets(); + QueryContext ctx = q.getContext(); + boolean isReplicated = containsReplicated(ctx); + ExecutorService threadPool = SliceThread.getPool(); + for (int i = 0; i < owner._queries.size(); i++) { + // if replicated, then execute only on single slice + if (isReplicated && !usedExecutors.isEmpty()) { + break; + } + StoreManager sm = owner.getDistributedStore().getSlice(i); + if (!targets.contains(sm)) + continue; + StoreQuery query = owner._queries.get(i); + Executor executor = executors.get(i); + if (!targets.contains(sm)) + continue; + usedExecutors.add(executor); + QueryExecutor call = new QueryExecutor(); + call.executor = executor; + call.query = query; + call.params = params; + call.range = range; + futures.add(threadPool.submit(call)); + } + for (Future future : futures) { + try { + rops.add(future.get()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new StoreException(e.getCause()); + } + } + + ResultObjectProvider[] tmp = rops + .toArray(new ResultObjectProvider[rops.size()]); + ResultObjectProvider result = null; + boolean[] ascending = getAscending(q); + boolean isAscending = ascending.length > 0; + boolean isAggregate = ctx.isAggregate(); + boolean hasRange = ctx.getEndRange() != Long.MAX_VALUE; + if (isAggregate) { + result = new UniqueResultObjectProvider(tmp, q, + getQueryExpressions()); + } else if (isAscending) { + result = new OrderingMergedResultObjectProvider(tmp, ascending, + usedExecutors.toArray(new Executor[usedExecutors.size()]), + q, params); + } else { + result = new MergedResultObjectProvider(tmp); + } + if (hasRange) { + result = new RangeResultObjectProvider(result, + ctx.getStartRange(), ctx.getEndRange()); + } + return result; + } + + /** + * Scans metadata to find out if a replicated class is the candidate. + */ + boolean containsReplicated(QueryContext query) { + Class candidate = query.getCandidateType(); + DistributedConfiguration conf = (DistributedConfiguration)query.getStoreContext() + .getConfiguration(); + if (candidate != null) { + return conf.isReplicated(candidate); + } + ClassMetaData[] metas = query.getAccessPathMetaDatas(); + if (metas == null || metas.length < 1) + return false; + for (ClassMetaData meta : metas) + if (conf.isReplicated(meta.getDescribedType())) + return true; + return false; + } + + public Number executeDelete(StoreQuery q, Object[] params) { + Iterator qs = owner._queries.iterator(); + List> futures = null; + int result = 0; + ExecutorService threadPool = SliceThread.getPool(); + for (Executor ex : executors) { + if (futures == null) + futures = new ArrayList>(); + DeleteExecutor call = new DeleteExecutor(); + call.executor = ex; + call.query = qs.next(); + call.params = params; + futures.add(threadPool.submit(call)); + } + for (Future future : futures) { + try { + Number n = future.get(); + if (n != null) + result += n.intValue(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new StoreException(e.getCause()); + } + } + return result; + } + + public Number executeUpdate(StoreQuery q, Object[] params) { + Iterator qs = owner._queries.iterator(); + List> futures = null; + int result = 0; + ExecutorService threadPool = SliceThread.getPool(); + for (Executor ex : executors) { + if (futures == null) + futures = new ArrayList>(); + UpdateExecutor call = new UpdateExecutor(); + call.executor = ex; + call.query = qs.next(); + call.params = params; + futures.add(threadPool.submit(call)); + } + for (Future future : futures) { + try { + Number n = future.get(); + result += (n == null) ? 0 : n.intValue(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new StoreException(e.getCause()); + } + } + return result; + } + + List findTargets() { + FetchConfiguration fetch = owner.getContext() + .getFetchConfiguration(); + return owner.getDistributedStore().getTargets(fetch); + } + + } + + static class QueryExecutor implements Callable { + StoreQuery query; + Executor executor; + Object[] params; + Range range; + + public ResultObjectProvider call() throws Exception { + return executor.executeQuery(query, params, range); + } + } + + static class DeleteExecutor implements Callable { + StoreQuery query; + Executor executor; + Object[] params; + + public Number call() throws Exception { + return executor.executeDelete(query, params); + } + } + + static class UpdateExecutor implements Callable { + StoreQuery query; + Executor executor; + Object[] params; + + public Number call() throws Exception { + return executor.executeUpdate(query, params); + } + } +}