SOLR-1051 -- Support the merge of multiple indexes as a CoreAdmin and an update command

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@779423 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2009-05-28 04:16:41 +00:00
parent 3e7361e964
commit 8499550da6
13 changed files with 416 additions and 3 deletions

View File

@ -226,6 +226,8 @@ New Features
55. SOLR-1116: Add a Binary FieldType (noble)
56. SOLR-1051: Support the merge of multiple indexes as a CoreAdmin and an update command (Ning Li via shalin)
Optimizations
----------------------
1. SOLR-374: Use IndexReader.reopen to save resources by re-using parts of the

View File

@ -53,6 +53,10 @@ public interface CoreAdminParams
/** If you specify a file, what is its name **/
public final static String FILE = "file";
/** If you merge indexes, what are the index directories.
* The directories are separated by ",". */
public final static String INDEX_DIRS = "indexDirs";
public enum CoreAdminAction {
STATUS,
LOAD,
@ -62,7 +66,8 @@ public interface CoreAdminParams
PERSIST,
SWAP,
RENAME,
ALIAS;
ALIAS,
MERGEINDEXES;
public static CoreAdminAction get( String p )
{

View File

@ -21,16 +21,22 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryResponse;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.util.RefCounted;
import org.apache.solr.update.MergeIndexesCommand;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.lucene.store.Directory;
import java.io.File;
import java.io.IOException;
@ -141,11 +147,18 @@ public class CoreAdminHandler extends RequestHandlerBase {
break;
}
case MERGEINDEXES: {
doPersist = this.handleMergeAction(req, rsp);
break;
}
default: {
doPersist = this.handleCustomAction(req, rsp);
break;
}
} // switch
case LOAD:
break;
}
}
// Should we persist the changes?
if (doPersist) {
@ -154,6 +167,37 @@ public class CoreAdminHandler extends RequestHandlerBase {
}
}
protected boolean handleMergeAction(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException {
boolean doPersist = false;
SolrParams params = req.getParams();
SolrParams required = params.required();
String cname = required.get(CoreAdminParams.CORE);
SolrCore core = coreContainer.getCore(cname);
if (core != null) {
try {
doPersist = coreContainer.isPersistent();
String p = required.get(CoreAdminParams.INDEX_DIRS);
String[] dirNames = p.split(",");
DirectoryFactory dirFactory = core.getDirectoryFactory();
Directory[] dirs = new Directory[dirNames.length];
for (int i = 0; i < dirNames.length; i++) {
dirs[i] = dirFactory.open(dirNames[i]);
}
UpdateRequestProcessorChain processorChain =
core.getUpdateProcessingChain(params.get(UpdateParams.UPDATE_PROCESSOR));
UpdateRequestProcessor processor =
processorChain.createProcessor(req, rsp);
processor.processMergeIndexes(new MergeIndexesCommand(dirs));
} finally {
core.close();
}
}
return doPersist;
}
/**
* Handle Custom Action.
* <p/>

View File

@ -226,6 +226,12 @@ public class DirectUpdateHandler extends UpdateHandler {
}
***************************/
public int mergeIndexes(MergeIndexesCommand cmd) throws IOException {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST,
"DirectUpdateHandler doesn't support mergeIndexes. Use DirectUpdateHandler2 instead.");
}
public void commit(CommitUpdateCommand cmd) throws IOException {
Future[] waitSearcher = null;
if (cmd.waitSearcher) {

View File

@ -29,6 +29,7 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.store.Directory;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@ -129,6 +130,7 @@ public class DirectUpdateHandler2 extends UpdateHandler {
AtomicLong deleteByIdCommandsCumulative= new AtomicLong();
AtomicLong deleteByQueryCommands= new AtomicLong();
AtomicLong deleteByQueryCommandsCumulative= new AtomicLong();
AtomicLong mergeIndexesCommands = new AtomicLong();
AtomicLong commitCommands= new AtomicLong();
AtomicLong optimizeCommands= new AtomicLong();
AtomicLong rollbackCommands= new AtomicLong();
@ -339,7 +341,35 @@ public class DirectUpdateHandler2 extends UpdateHandler {
}
}
public void forceOpenWriter() throws IOException {
public int mergeIndexes(MergeIndexesCommand cmd) throws IOException {
mergeIndexesCommands.incrementAndGet();
int rc = -1;
iwCommit.lock();
try {
log.info("start " + cmd);
Directory[] dirs = cmd.dirs;
if (dirs != null && dirs.length > 0) {
openWriter();
writer.addIndexesNoOptimize(dirs);
rc = 1;
} else {
rc = 0;
}
log.info("end_mergeIndexes");
} finally {
iwCommit.unlock();
}
if (rc == 1 && tracker.timeUpperBound > 0) {
tracker.scheduleCommitWithin(tracker.timeUpperBound);
}
return rc;
}
public void forceOpenWriter() throws IOException {
iwCommit.lock();
try {
openWriter();

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update;
import org.apache.lucene.store.Directory;
/**
* A merge indexes command encapsulated in an object.
*
* @since solr 1.4
* @version $Id$
*/
public class MergeIndexesCommand extends UpdateCommand {
public Directory[] dirs;
public MergeIndexesCommand() {
this(null);
}
public MergeIndexesCommand(Directory[] dirs) {
super("mergeIndexes");
this.dirs = dirs;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(commandName);
sb.append(':');
if (dirs != null && dirs.length > 0) {
sb.append(dirs[0]);
for (int i = 1; i < dirs.length; i++) {
sb.append(",").append(dirs[i]);
}
}
return sb.toString();
}
}

View File

@ -155,6 +155,7 @@ public abstract class UpdateHandler implements SolrInfoMBean {
public abstract int addDoc(AddUpdateCommand cmd) throws IOException;
public abstract void delete(DeleteUpdateCommand cmd) throws IOException;
public abstract void deleteByQuery(DeleteUpdateCommand cmd) throws IOException;
public abstract int mergeIndexes(MergeIndexesCommand cmd) throws IOException;
public abstract void commit(CommitUpdateCommand cmd) throws IOException;
public abstract void rollback(RollbackUpdateCommand cmd) throws IOException;
public abstract void close() throws IOException;

View File

@ -29,6 +29,7 @@ import org.apache.solr.request.SolrQueryResponse;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.MergeIndexesCommand;
import org.apache.solr.update.RollbackUpdateCommand;
/**
@ -125,6 +126,13 @@ class LogUpdateProcessor extends UpdateRequestProcessor {
numDeletes++;
}
@Override
public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
if (next != null) next.processMergeIndexes(cmd);
toLog.add("mergeIndexes", cmd.toString());
}
@Override
public void processCommit( CommitUpdateCommand cmd ) throws IOException {
if (next != null) next.processCommit(cmd);

View File

@ -25,6 +25,7 @@ import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.DocumentBuilder;
import org.apache.solr.update.MergeIndexesCommand;
import org.apache.solr.update.RollbackUpdateCommand;
import org.apache.solr.update.UpdateHandler;
@ -72,6 +73,12 @@ class RunUpdateProcessor extends UpdateRequestProcessor
super.processDelete(cmd);
}
@Override
public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
updateHandler.mergeIndexes(cmd);
super.processMergeIndexes(cmd);
}
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException
{

View File

@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.MergeIndexesCommand;
import org.apache.solr.update.RollbackUpdateCommand;
@ -56,6 +57,10 @@ public abstract class UpdateRequestProcessor {
if (next != null) next.processDelete(cmd);
}
public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
if (next != null) next.processMergeIndexes(cmd);
}
public void processCommit(CommitUpdateCommand cmd) throws IOException
{
if (next != null) next.processCommit(cmd);

View File

@ -109,6 +109,34 @@ public class CoreAdminRequest extends SolrRequest
}
}
public static class MergeIndexes extends CoreAdminRequest {
protected String indexDirs;
public MergeIndexes() {
action = CoreAdminAction.MERGEINDEXES;
}
public void setIndexDirs(String indexDirs) {
this.indexDirs = indexDirs;
}
public String getIndexDirs() {
return indexDirs;
}
@Override
public SolrParams getParams() {
if (action == null) {
throw new RuntimeException("no action specified!");
}
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, action.toString());
params.set(CoreAdminParams.CORE, core);
params.set(CoreAdminParams.INDEX_DIRS, indexDirs);
return params;
}
}
public CoreAdminRequest()
{
super( METHOD.GET, "/admin/cores" );
@ -247,4 +275,23 @@ public class CoreAdminRequest extends SolrRequest
req.setFileName(fileName);
return req.process(server);
}
public static CoreAdminResponse mergeIndexes(String name,
String[] indexDirs, SolrServer server) throws SolrServerException,
IOException {
CoreAdminRequest.MergeIndexes req = new CoreAdminRequest.MergeIndexes();
req.setCoreName(name);
String p = null;
if (indexDirs.length == 1) {
p = indexDirs[0];
} else if (indexDirs.length > 1) {
StringBuilder s = new StringBuilder(indexDirs[0]);
for (int i = 1; i < indexDirs.length; i++) {
s.append(",").append(indexDirs[i]);
}
p = s.toString();
}
req.setIndexDirs(p);
return req.process(server);
}
}

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequest.ACTION;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
/**
* Abstract base class for testing merge indexes command
*
* @since solr 1.4
* @version $Id$
*/
public abstract class MergeIndexesExampleTestBase extends SolrExampleTestBase {
// protected static final CoreContainer cores = new CoreContainer();
protected static CoreContainer cores;
@Override
public String getSolrHome() {
return "../../../example/multicore/";
}
@Override
public String getSchemaFile() {
return getSolrHome() + "core0/conf/schema.xml";
}
@Override
public String getSolrConfigFile() {
return getSolrHome() + "core0/conf/solrconfig.xml";
}
@Override
public void setUp() throws Exception {
super.setUp();
cores = h.getCoreContainer();
SolrCore.log.info("CORES=" + cores + " : " + cores.getCoreNames());
cores.setPersistent(false);
}
@Override
protected final SolrServer getSolrServer() {
throw new UnsupportedOperationException();
}
@Override
protected final SolrServer createNewSolrServer() {
throw new UnsupportedOperationException();
}
protected abstract SolrServer getSolrCore0();
protected abstract SolrServer getSolrCore1();
protected abstract SolrServer getSolrAdmin();
protected abstract SolrServer getSolrCore(String name);
protected abstract String getIndexDirCore1();
public void testMergeIndexes() throws Exception {
UpdateRequest up = new UpdateRequest();
up.setAction(ACTION.COMMIT, true, true);
up.deleteByQuery("*:*");
up.process(getSolrCore0());
up.process(getSolrCore1());
up.clear();
// Add something to each core
SolrInputDocument doc = new SolrInputDocument();
doc.setField("id", "AAA");
doc.setField("name", "core0");
// Add to core0
up.add(doc);
up.process(getSolrCore0());
// Add to core1
doc.setField("id", "BBB");
doc.setField("name", "core1");
up.add(doc);
up.process(getSolrCore1());
// Now Make sure AAA is in 0 and BBB in 1
SolrQuery q = new SolrQuery();
QueryRequest r = new QueryRequest(q);
q.setQuery("id:AAA");
assertEquals(1, r.process(getSolrCore0()).getResults().size());
assertEquals(0, r.process(getSolrCore1()).getResults().size());
assertEquals(1,
getSolrCore0().query(new SolrQuery("id:AAA")).getResults().size());
assertEquals(0,
getSolrCore0().query(new SolrQuery("id:BBB")).getResults().size());
assertEquals(0,
getSolrCore1().query(new SolrQuery("id:AAA")).getResults().size());
assertEquals(1,
getSolrCore1().query(new SolrQuery("id:BBB")).getResults().size());
// Now get the index directory of core1 and merge with core0
String indexDir = getIndexDirCore1();
String name = "core0";
SolrServer coreadmin = getSolrAdmin();
CoreAdminRequest.mergeIndexes(name, new String[] { indexDir }, coreadmin);
// Now commit the merged index
up.clear(); // just do commit
up.process(getSolrCore0());
assertEquals(1,
getSolrCore0().query(new SolrQuery("id:AAA")).getResults().size());
assertEquals(1,
getSolrCore0().query(new SolrQuery("id:BBB")).getResults().size());
}
}

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.embedded;
import java.io.File;
import org.apache.solr.client.solrj.MergeIndexesExampleTestBase;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.core.SolrCore;
/**
* Test for merge indexes command
*
* @since solr 1.4
* @version $Id$
*/
public class MergeIndexesEmbeddedTest extends MergeIndexesExampleTestBase {
@Override
public void setUp() throws Exception {
super.setUp();
File home = new File(getSolrHome());
File f = new File(home, "solr.xml");
cores.load(getSolrHome(), f);
}
@Override
protected SolrServer getSolrCore0() {
return new EmbeddedSolrServer(cores, "core0");
}
@Override
protected SolrServer getSolrCore1() {
return new EmbeddedSolrServer(cores, "core1");
}
@Override
protected SolrServer getSolrCore(String name) {
return new EmbeddedSolrServer(cores, name);
}
@Override
protected SolrServer getSolrAdmin() {
return new EmbeddedSolrServer(cores, "core0");
}
@Override
protected String getIndexDirCore1() {
SolrCore core1 = cores.getCore("core1");
String indexDir = core1.getIndexDir();
core1.close();
return indexDir;
}
}