diff --git a/CHANGES.txt b/CHANGES.txt index fb2654e984d..704b06e2a6e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -226,6 +226,8 @@ New Features 55. SOLR-1116: Add a Binary FieldType (noble) +56. SOLR-1051: Support the merge of multiple indexes as a CoreAdmin and an update command (Ning Li via shalin) + Optimizations ---------------------- 1. SOLR-374: Use IndexReader.reopen to save resources by re-using parts of the diff --git a/src/common/org/apache/solr/common/params/CoreAdminParams.java b/src/common/org/apache/solr/common/params/CoreAdminParams.java index 4c8c1382677..e198227702a 100644 --- a/src/common/org/apache/solr/common/params/CoreAdminParams.java +++ b/src/common/org/apache/solr/common/params/CoreAdminParams.java @@ -53,6 +53,10 @@ public interface CoreAdminParams /** If you specify a file, what is its name **/ public final static String FILE = "file"; + /** If you merge indexes, what are the index directories. + * The directories are separated by ",". */ + public final static String INDEX_DIRS = "indexDirs"; + public enum CoreAdminAction { STATUS, LOAD, @@ -62,7 +66,8 @@ public interface CoreAdminParams PERSIST, SWAP, RENAME, - ALIAS; + ALIAS, + MERGEINDEXES; public static CoreAdminAction get( String p ) { diff --git a/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java b/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java index a3ed3a84182..7ad3316dcad 100644 --- a/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java +++ b/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java @@ -21,16 +21,22 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction; import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.params.UpdateParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SimpleOrderedMap; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.CoreDescriptor; import org.apache.solr.core.SolrCore; +import org.apache.solr.core.DirectoryFactory; import org.apache.solr.handler.RequestHandlerBase; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryResponse; import org.apache.solr.search.SolrIndexSearcher; import org.apache.solr.util.RefCounted; +import org.apache.solr.update.MergeIndexesCommand; +import org.apache.solr.update.processor.UpdateRequestProcessor; +import org.apache.solr.update.processor.UpdateRequestProcessorChain; +import org.apache.lucene.store.Directory; import java.io.File; import java.io.IOException; @@ -141,11 +147,18 @@ public class CoreAdminHandler extends RequestHandlerBase { break; } + case MERGEINDEXES: { + doPersist = this.handleMergeAction(req, rsp); + break; + } + default: { doPersist = this.handleCustomAction(req, rsp); break; } - } // switch + case LOAD: + break; + } } // Should we persist the changes? if (doPersist) { @@ -154,6 +167,37 @@ public class CoreAdminHandler extends RequestHandlerBase { } } + protected boolean handleMergeAction(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException { + boolean doPersist = false; + SolrParams params = req.getParams(); + SolrParams required = params.required(); + String cname = required.get(CoreAdminParams.CORE); + SolrCore core = coreContainer.getCore(cname); + if (core != null) { + try { + doPersist = coreContainer.isPersistent(); + + String p = required.get(CoreAdminParams.INDEX_DIRS); + String[] dirNames = p.split(","); + + DirectoryFactory dirFactory = core.getDirectoryFactory(); + Directory[] dirs = new Directory[dirNames.length]; + for (int i = 0; i < dirNames.length; i++) { + dirs[i] = dirFactory.open(dirNames[i]); + } + + UpdateRequestProcessorChain processorChain = + core.getUpdateProcessingChain(params.get(UpdateParams.UPDATE_PROCESSOR)); + UpdateRequestProcessor processor = + processorChain.createProcessor(req, rsp); + processor.processMergeIndexes(new MergeIndexesCommand(dirs)); + } finally { + core.close(); + } + } + return doPersist; + } + /** * Handle Custom Action. *

diff --git a/src/java/org/apache/solr/update/DirectUpdateHandler.java b/src/java/org/apache/solr/update/DirectUpdateHandler.java index 4eafc2516a2..d77b37c9700 100644 --- a/src/java/org/apache/solr/update/DirectUpdateHandler.java +++ b/src/java/org/apache/solr/update/DirectUpdateHandler.java @@ -226,6 +226,12 @@ public class DirectUpdateHandler extends UpdateHandler { } ***************************/ + public int mergeIndexes(MergeIndexesCommand cmd) throws IOException { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "DirectUpdateHandler doesn't support mergeIndexes. Use DirectUpdateHandler2 instead."); + } + public void commit(CommitUpdateCommand cmd) throws IOException { Future[] waitSearcher = null; if (cmd.waitSearcher) { diff --git a/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/src/java/org/apache/solr/update/DirectUpdateHandler2.java index 43d7fa98432..5a75c3c1c4f 100644 --- a/src/java/org/apache/solr/update/DirectUpdateHandler2.java +++ b/src/java/org/apache/solr/update/DirectUpdateHandler2.java @@ -29,6 +29,7 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.BooleanClause.Occur; +import org.apache.lucene.store.Directory; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -129,6 +130,7 @@ public class DirectUpdateHandler2 extends UpdateHandler { AtomicLong deleteByIdCommandsCumulative= new AtomicLong(); AtomicLong deleteByQueryCommands= new AtomicLong(); AtomicLong deleteByQueryCommandsCumulative= new AtomicLong(); + AtomicLong mergeIndexesCommands = new AtomicLong(); AtomicLong commitCommands= new AtomicLong(); AtomicLong optimizeCommands= new AtomicLong(); AtomicLong rollbackCommands= new AtomicLong(); @@ -339,7 +341,35 @@ public class DirectUpdateHandler2 extends UpdateHandler { } } - public void forceOpenWriter() throws IOException { + public int mergeIndexes(MergeIndexesCommand cmd) throws IOException { + mergeIndexesCommands.incrementAndGet(); + int rc = -1; + + iwCommit.lock(); + try { + log.info("start " + cmd); + + Directory[] dirs = cmd.dirs; + if (dirs != null && dirs.length > 0) { + openWriter(); + writer.addIndexesNoOptimize(dirs); + rc = 1; + } else { + rc = 0; + } + log.info("end_mergeIndexes"); + } finally { + iwCommit.unlock(); + } + + if (rc == 1 && tracker.timeUpperBound > 0) { + tracker.scheduleCommitWithin(tracker.timeUpperBound); + } + + return rc; + } + + public void forceOpenWriter() throws IOException { iwCommit.lock(); try { openWriter(); diff --git a/src/java/org/apache/solr/update/MergeIndexesCommand.java b/src/java/org/apache/solr/update/MergeIndexesCommand.java new file mode 100644 index 00000000000..48b97fa7b4e --- /dev/null +++ b/src/java/org/apache/solr/update/MergeIndexesCommand.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.update; + +import org.apache.lucene.store.Directory; + +/** + * A merge indexes command encapsulated in an object. + * + * @since solr 1.4 + * @version $Id$ + */ +public class MergeIndexesCommand extends UpdateCommand { + public Directory[] dirs; + + public MergeIndexesCommand() { + this(null); + } + + public MergeIndexesCommand(Directory[] dirs) { + super("mergeIndexes"); + this.dirs = dirs; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(commandName); + sb.append(':'); + if (dirs != null && dirs.length > 0) { + sb.append(dirs[0]); + for (int i = 1; i < dirs.length; i++) { + sb.append(",").append(dirs[i]); + } + } + return sb.toString(); + } +} diff --git a/src/java/org/apache/solr/update/UpdateHandler.java b/src/java/org/apache/solr/update/UpdateHandler.java index 72a10c2d819..4efe4a74f6b 100644 --- a/src/java/org/apache/solr/update/UpdateHandler.java +++ b/src/java/org/apache/solr/update/UpdateHandler.java @@ -155,6 +155,7 @@ public abstract class UpdateHandler implements SolrInfoMBean { public abstract int addDoc(AddUpdateCommand cmd) throws IOException; public abstract void delete(DeleteUpdateCommand cmd) throws IOException; public abstract void deleteByQuery(DeleteUpdateCommand cmd) throws IOException; + public abstract int mergeIndexes(MergeIndexesCommand cmd) throws IOException; public abstract void commit(CommitUpdateCommand cmd) throws IOException; public abstract void rollback(RollbackUpdateCommand cmd) throws IOException; public abstract void close() throws IOException; diff --git a/src/java/org/apache/solr/update/processor/LogUpdateProcessorFactory.java b/src/java/org/apache/solr/update/processor/LogUpdateProcessorFactory.java index 7ddd8ed33ec..2f3681fcbe0 100644 --- a/src/java/org/apache/solr/update/processor/LogUpdateProcessorFactory.java +++ b/src/java/org/apache/solr/update/processor/LogUpdateProcessorFactory.java @@ -29,6 +29,7 @@ import org.apache.solr.request.SolrQueryResponse; import org.apache.solr.update.AddUpdateCommand; import org.apache.solr.update.CommitUpdateCommand; import org.apache.solr.update.DeleteUpdateCommand; +import org.apache.solr.update.MergeIndexesCommand; import org.apache.solr.update.RollbackUpdateCommand; /** @@ -125,6 +126,13 @@ class LogUpdateProcessor extends UpdateRequestProcessor { numDeletes++; } + @Override + public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException { + if (next != null) next.processMergeIndexes(cmd); + + toLog.add("mergeIndexes", cmd.toString()); + } + @Override public void processCommit( CommitUpdateCommand cmd ) throws IOException { if (next != null) next.processCommit(cmd); diff --git a/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java b/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java index ed6c1c6e0a6..14ca807b90d 100644 --- a/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java +++ b/src/java/org/apache/solr/update/processor/RunUpdateProcessorFactory.java @@ -25,6 +25,7 @@ import org.apache.solr.update.AddUpdateCommand; import org.apache.solr.update.CommitUpdateCommand; import org.apache.solr.update.DeleteUpdateCommand; import org.apache.solr.update.DocumentBuilder; +import org.apache.solr.update.MergeIndexesCommand; import org.apache.solr.update.RollbackUpdateCommand; import org.apache.solr.update.UpdateHandler; @@ -72,6 +73,12 @@ class RunUpdateProcessor extends UpdateRequestProcessor super.processDelete(cmd); } + @Override + public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException { + updateHandler.mergeIndexes(cmd); + super.processMergeIndexes(cmd); + } + @Override public void processCommit(CommitUpdateCommand cmd) throws IOException { diff --git a/src/java/org/apache/solr/update/processor/UpdateRequestProcessor.java b/src/java/org/apache/solr/update/processor/UpdateRequestProcessor.java index 0397645ad57..10e317d5023 100644 --- a/src/java/org/apache/solr/update/processor/UpdateRequestProcessor.java +++ b/src/java/org/apache/solr/update/processor/UpdateRequestProcessor.java @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory; import org.apache.solr.update.AddUpdateCommand; import org.apache.solr.update.CommitUpdateCommand; import org.apache.solr.update.DeleteUpdateCommand; +import org.apache.solr.update.MergeIndexesCommand; import org.apache.solr.update.RollbackUpdateCommand; @@ -56,6 +57,10 @@ public abstract class UpdateRequestProcessor { if (next != null) next.processDelete(cmd); } + public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException { + if (next != null) next.processMergeIndexes(cmd); + } + public void processCommit(CommitUpdateCommand cmd) throws IOException { if (next != null) next.processCommit(cmd); diff --git a/src/solrj/org/apache/solr/client/solrj/request/CoreAdminRequest.java b/src/solrj/org/apache/solr/client/solrj/request/CoreAdminRequest.java index c2a23895235..1a284c1f008 100644 --- a/src/solrj/org/apache/solr/client/solrj/request/CoreAdminRequest.java +++ b/src/solrj/org/apache/solr/client/solrj/request/CoreAdminRequest.java @@ -109,6 +109,34 @@ public class CoreAdminRequest extends SolrRequest } } + public static class MergeIndexes extends CoreAdminRequest { + protected String indexDirs; + + public MergeIndexes() { + action = CoreAdminAction.MERGEINDEXES; + } + + public void setIndexDirs(String indexDirs) { + this.indexDirs = indexDirs; + } + + public String getIndexDirs() { + return indexDirs; + } + + @Override + public SolrParams getParams() { + if (action == null) { + throw new RuntimeException("no action specified!"); + } + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(CoreAdminParams.ACTION, action.toString()); + params.set(CoreAdminParams.CORE, core); + params.set(CoreAdminParams.INDEX_DIRS, indexDirs); + return params; + } + } + public CoreAdminRequest() { super( METHOD.GET, "/admin/cores" ); @@ -247,4 +275,23 @@ public class CoreAdminRequest extends SolrRequest req.setFileName(fileName); return req.process(server); } + + public static CoreAdminResponse mergeIndexes(String name, + String[] indexDirs, SolrServer server) throws SolrServerException, + IOException { + CoreAdminRequest.MergeIndexes req = new CoreAdminRequest.MergeIndexes(); + req.setCoreName(name); + String p = null; + if (indexDirs.length == 1) { + p = indexDirs[0]; + } else if (indexDirs.length > 1) { + StringBuilder s = new StringBuilder(indexDirs[0]); + for (int i = 1; i < indexDirs.length; i++) { + s.append(",").append(indexDirs[i]); + } + p = s.toString(); + } + req.setIndexDirs(p); + return req.process(server); + } } diff --git a/src/test/org/apache/solr/client/solrj/MergeIndexesExampleTestBase.java b/src/test/org/apache/solr/client/solrj/MergeIndexesExampleTestBase.java new file mode 100644 index 00000000000..195720be4bd --- /dev/null +++ b/src/test/org/apache/solr/client/solrj/MergeIndexesExampleTestBase.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj; + +import org.apache.solr.client.solrj.request.CoreAdminRequest; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.request.UpdateRequest.ACTION; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.SolrCore; + +/** + * Abstract base class for testing merge indexes command + * + * @since solr 1.4 + * @version $Id$ + */ +public abstract class MergeIndexesExampleTestBase extends SolrExampleTestBase { + // protected static final CoreContainer cores = new CoreContainer(); + protected static CoreContainer cores; + + @Override + public String getSolrHome() { + return "../../../example/multicore/"; + } + + @Override + public String getSchemaFile() { + return getSolrHome() + "core0/conf/schema.xml"; + } + + @Override + public String getSolrConfigFile() { + return getSolrHome() + "core0/conf/solrconfig.xml"; + } + + @Override + public void setUp() throws Exception { + super.setUp(); + cores = h.getCoreContainer(); + SolrCore.log.info("CORES=" + cores + " : " + cores.getCoreNames()); + cores.setPersistent(false); + } + + @Override + protected final SolrServer getSolrServer() { + throw new UnsupportedOperationException(); + } + + @Override + protected final SolrServer createNewSolrServer() { + throw new UnsupportedOperationException(); + } + + protected abstract SolrServer getSolrCore0(); + + protected abstract SolrServer getSolrCore1(); + + protected abstract SolrServer getSolrAdmin(); + + protected abstract SolrServer getSolrCore(String name); + + protected abstract String getIndexDirCore1(); + + public void testMergeIndexes() throws Exception { + UpdateRequest up = new UpdateRequest(); + up.setAction(ACTION.COMMIT, true, true); + up.deleteByQuery("*:*"); + up.process(getSolrCore0()); + up.process(getSolrCore1()); + up.clear(); + + // Add something to each core + SolrInputDocument doc = new SolrInputDocument(); + doc.setField("id", "AAA"); + doc.setField("name", "core0"); + + // Add to core0 + up.add(doc); + up.process(getSolrCore0()); + + // Add to core1 + doc.setField("id", "BBB"); + doc.setField("name", "core1"); + up.add(doc); + up.process(getSolrCore1()); + + // Now Make sure AAA is in 0 and BBB in 1 + SolrQuery q = new SolrQuery(); + QueryRequest r = new QueryRequest(q); + q.setQuery("id:AAA"); + assertEquals(1, r.process(getSolrCore0()).getResults().size()); + assertEquals(0, r.process(getSolrCore1()).getResults().size()); + + assertEquals(1, + getSolrCore0().query(new SolrQuery("id:AAA")).getResults().size()); + assertEquals(0, + getSolrCore0().query(new SolrQuery("id:BBB")).getResults().size()); + + assertEquals(0, + getSolrCore1().query(new SolrQuery("id:AAA")).getResults().size()); + assertEquals(1, + getSolrCore1().query(new SolrQuery("id:BBB")).getResults().size()); + + // Now get the index directory of core1 and merge with core0 + String indexDir = getIndexDirCore1(); + String name = "core0"; + SolrServer coreadmin = getSolrAdmin(); + CoreAdminRequest.mergeIndexes(name, new String[] { indexDir }, coreadmin); + + // Now commit the merged index + up.clear(); // just do commit + up.process(getSolrCore0()); + + assertEquals(1, + getSolrCore0().query(new SolrQuery("id:AAA")).getResults().size()); + assertEquals(1, + getSolrCore0().query(new SolrQuery("id:BBB")).getResults().size()); + } +} diff --git a/src/test/org/apache/solr/client/solrj/embedded/MergeIndexesEmbeddedTest.java b/src/test/org/apache/solr/client/solrj/embedded/MergeIndexesEmbeddedTest.java new file mode 100644 index 00000000000..1a9164867bc --- /dev/null +++ b/src/test/org/apache/solr/client/solrj/embedded/MergeIndexesEmbeddedTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.embedded; + +import java.io.File; + +import org.apache.solr.client.solrj.MergeIndexesExampleTestBase; +import org.apache.solr.client.solrj.SolrServer; +import org.apache.solr.core.SolrCore; + +/** + * Test for merge indexes command + * + * @since solr 1.4 + * @version $Id$ + */ +public class MergeIndexesEmbeddedTest extends MergeIndexesExampleTestBase { + + @Override + public void setUp() throws Exception { + super.setUp(); + + File home = new File(getSolrHome()); + File f = new File(home, "solr.xml"); + cores.load(getSolrHome(), f); + } + + @Override + protected SolrServer getSolrCore0() { + return new EmbeddedSolrServer(cores, "core0"); + } + + @Override + protected SolrServer getSolrCore1() { + return new EmbeddedSolrServer(cores, "core1"); + } + + @Override + protected SolrServer getSolrCore(String name) { + return new EmbeddedSolrServer(cores, name); + } + + @Override + protected SolrServer getSolrAdmin() { + return new EmbeddedSolrServer(cores, "core0"); + } + + @Override + protected String getIndexDirCore1() { + SolrCore core1 = cores.getCore("core1"); + String indexDir = core1.getIndexDir(); + core1.close(); + return indexDir; + } +}