SOLR-1331 -- Added a srcCore parameter to CoreAdminHandler's mergeindexes action to merge one or more cores' indexes to a target core

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1137533 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2011-06-20 07:32:12 +00:00
parent 3d7ebb0627
commit 532f443146
7 changed files with 120 additions and 33 deletions

View File

@ -266,6 +266,9 @@ New Features
Karsten Sperling, Michael Gundlach, Oleg Gnatovskiy, Thomas Traeger, Karsten Sperling, Michael Gundlach, Oleg Gnatovskiy, Thomas Traeger,
Harish Agarwal, yonik, Michael McCandless, Bill Bell) Harish Agarwal, yonik, Michael McCandless, Bill Bell)
* SOLR-1331 -- Added a srcCore parameter to CoreAdminHandler's mergeindexes action
to merge one or more cores' indexes to a target core (shalin)
Optimizations Optimizations
---------------------- ----------------------

View File

@ -59,6 +59,10 @@ public interface CoreAdminParams
* The directories are specified by multiple indexDir parameters. */ * The directories are specified by multiple indexDir parameters. */
public final static String INDEX_DIR = "indexDir"; public final static String INDEX_DIR = "indexDir";
/** If you merge indexes, what is the source core's name
* More than one source core can be specified by multiple srcCore parameters */
public final static String SRC_CORE = "srcCore";
/** The collection name in solr cloud */ /** The collection name in solr cloud */
public final static String COLLECTION = "collection"; public final static String COLLECTION = "collection";

View File

@ -17,6 +17,9 @@
package org.apache.solr.handler.admin; package org.apache.solr.handler.admin;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.cloud.CloudDescriptor; import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.CoreAdminParams;
@ -42,7 +45,9 @@ import org.apache.lucene.store.Directory;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List;
/** /**
* *
@ -171,22 +176,53 @@ public class CoreAdminHandler extends RequestHandlerBase {
} }
protected boolean handleMergeAction(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException { protected boolean handleMergeAction(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException {
boolean doPersist = false;
SolrParams params = req.getParams(); SolrParams params = req.getParams();
SolrParams required = params.required(); String cname = params.required().get(CoreAdminParams.CORE);
String cname = required.get(CoreAdminParams.CORE);
SolrCore core = coreContainer.getCore(cname); SolrCore core = coreContainer.getCore(cname);
SolrQueryRequest wrappedReq = null; SolrQueryRequest wrappedReq = null;
SolrCore[] sourceCores = null;
RefCounted<SolrIndexSearcher>[] searchers = null;
// stores readers created from indexDir param values
IndexReader[] readersToBeClosed = null;
if (core != null) { if (core != null) {
try { try {
doPersist = coreContainer.isPersistent(); String[] dirNames = params.getParams(CoreAdminParams.INDEX_DIR);
if (dirNames == null || dirNames.length == 0) {
String[] sources = params.getParams("srcCore");
if (sources == null || sources.length == 0)
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,
"At least one indexDir or srcCore must be specified");
String[] dirNames = required.getParams(CoreAdminParams.INDEX_DIR); sourceCores = new SolrCore[sources.length];
for (int i = 0; i < sources.length; i++) {
String source = sources[i];
SolrCore srcCore = coreContainer.getCore(source);
if (srcCore == null)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Core: " + source + " does not exist");
sourceCores[i] = srcCore;
}
} else {
readersToBeClosed = new IndexReader[dirNames.length];
DirectoryFactory dirFactory = core.getDirectoryFactory();
for (int i = 0; i < dirNames.length; i++) {
readersToBeClosed[i] = IndexReader.open(dirFactory.open(dirNames[i]), true);
}
}
DirectoryFactory dirFactory = core.getDirectoryFactory(); IndexReader[] readers = null;
Directory[] dirs = new Directory[dirNames.length]; if (readersToBeClosed != null) {
for (int i = 0; i < dirNames.length; i++) { readers = readersToBeClosed;
dirs[i] = dirFactory.open(dirNames[i]); } else {
readers = new IndexReader[sourceCores.length];
searchers = new RefCounted[sourceCores.length];
for (int i = 0; i < sourceCores.length; i++) {
SolrCore solrCore = sourceCores[i];
// record the searchers so that we can decref
searchers[i] = solrCore.getSearcher();
readers[i] = searchers[i].get().getIndexReader();
}
} }
UpdateRequestProcessorChain processorChain = UpdateRequestProcessorChain processorChain =
@ -194,13 +230,24 @@ public class CoreAdminHandler extends RequestHandlerBase {
wrappedReq = new LocalSolrQueryRequest(core, req.getParams()); wrappedReq = new LocalSolrQueryRequest(core, req.getParams());
UpdateRequestProcessor processor = UpdateRequestProcessor processor =
processorChain.createProcessor(wrappedReq, rsp); processorChain.createProcessor(wrappedReq, rsp);
processor.processMergeIndexes(new MergeIndexesCommand(dirs, req)); processor.processMergeIndexes(new MergeIndexesCommand(readers, req));
} finally { } finally {
if (searchers != null) {
for (RefCounted<SolrIndexSearcher> searcher : searchers) {
if (searcher != null) searcher.decref();
}
}
if (sourceCores != null) {
for (SolrCore solrCore : sourceCores) {
if (solrCore != null) solrCore.close();
}
}
if (readersToBeClosed != null) IOUtils.closeSafely(true, readersToBeClosed);
if (wrappedReq != null) wrappedReq.close();
core.close(); core.close();
wrappedReq.close();
} }
} }
return doPersist; return coreContainer.isPersistent();
} }
/** /**

View File

@ -20,6 +20,7 @@
package org.apache.solr.update; package org.apache.solr.update;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.apache.lucene.queryParser.ParseException; import org.apache.lucene.queryParser.ParseException;
@ -277,10 +278,10 @@ public class DirectUpdateHandler2 extends UpdateHandler {
try { try {
log.info("start " + cmd); log.info("start " + cmd);
Directory[] dirs = cmd.dirs; IndexReader[] readers = cmd.readers;
if (dirs != null && dirs.length > 0) { if (readers != null && readers.length > 0) {
openWriter(); openWriter();
writer.addIndexes(dirs); writer.addIndexes(readers);
rc = 1; rc = 1;
} else { } else {
rc = 0; rc = 0;

View File

@ -17,6 +17,7 @@
package org.apache.solr.update; package org.apache.solr.update;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
@ -27,25 +28,21 @@ import org.apache.solr.request.SolrQueryRequest;
* *
*/ */
public class MergeIndexesCommand extends UpdateCommand { public class MergeIndexesCommand extends UpdateCommand {
public Directory[] dirs; public IndexReader[] readers;
public MergeIndexesCommand(SolrQueryRequest req) { public MergeIndexesCommand(IndexReader[] readers, SolrQueryRequest req) {
this(null, req);
}
public MergeIndexesCommand(Directory[] dirs, SolrQueryRequest req) {
super("mergeIndexes", req); super("mergeIndexes", req);
this.dirs = dirs; this.readers = readers;
} }
@Override @Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(commandName); StringBuilder sb = new StringBuilder(commandName);
sb.append(':'); sb.append(':');
if (dirs != null && dirs.length > 0) { if (readers != null && readers.length > 0) {
sb.append(dirs[0]); sb.append(readers[0].directory());
for (int i = 1; i < dirs.length; i++) { for (int i = 1; i < readers.length; i++) {
sb.append(",").append(dirs[i]); sb.append(",").append(readers[i].directory());
} }
} }
return sb.toString(); return sb.toString();

View File

@ -119,6 +119,7 @@ public class CoreAdminRequest extends SolrRequest
public static class MergeIndexes extends CoreAdminRequest { public static class MergeIndexes extends CoreAdminRequest {
protected List<String> indexDirs; protected List<String> indexDirs;
protected List<String> srcCores;
public MergeIndexes() { public MergeIndexes() {
action = CoreAdminAction.MERGEINDEXES; action = CoreAdminAction.MERGEINDEXES;
@ -132,6 +133,14 @@ public class CoreAdminRequest extends SolrRequest
return indexDirs; return indexDirs;
} }
public List<String> getSrcCores() {
return srcCores;
}
public void setSrcCores(List<String> srcCores) {
this.srcCores = srcCores;
}
@Override @Override
public SolrParams getParams() { public SolrParams getParams() {
if (action == null) { if (action == null) {
@ -145,6 +154,11 @@ public class CoreAdminRequest extends SolrRequest
params.set(CoreAdminParams.INDEX_DIR, indexDir); params.set(CoreAdminParams.INDEX_DIR, indexDir);
} }
} }
if (srcCores != null) {
for (String srcCore : srcCores) {
params.set(CoreAdminParams.SRC_CORE, srcCore);
}
}
return params; return params;
} }
} }
@ -289,11 +303,12 @@ public class CoreAdminRequest extends SolrRequest
} }
public static CoreAdminResponse mergeIndexes(String name, public static CoreAdminResponse mergeIndexes(String name,
String[] indexDirs, SolrServer server) throws SolrServerException, String[] indexDirs, String[] srcCores, SolrServer server) throws SolrServerException,
IOException { IOException {
CoreAdminRequest.MergeIndexes req = new CoreAdminRequest.MergeIndexes(); CoreAdminRequest.MergeIndexes req = new CoreAdminRequest.MergeIndexes();
req.setCoreName(name); req.setCoreName(name);
req.setIndexDirs(Arrays.asList(indexDirs)); req.setIndexDirs(Arrays.asList(indexDirs));
req.setSrcCores(Arrays.asList(srcCores));
return req.process(server); return req.process(server);
} }
} }

View File

@ -17,6 +17,7 @@
package org.apache.solr.client.solrj; package org.apache.solr.client.solrj;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest; import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.request.UpdateRequest;
@ -26,6 +27,8 @@ import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrCore;
import org.apache.solr.util.ExternalPaths; import org.apache.solr.util.ExternalPaths;
import java.io.IOException;
/** /**
* Abstract base class for testing merge indexes command * Abstract base class for testing merge indexes command
* *
@ -79,9 +82,9 @@ public abstract class MergeIndexesExampleTestBase extends SolrExampleTestBase {
protected abstract String getIndexDirCore1(); protected abstract String getIndexDirCore1();
public void testMergeIndexes() throws Exception { private UpdateRequest setupCores() throws SolrServerException, IOException {
UpdateRequest up = new UpdateRequest(); UpdateRequest up = new UpdateRequest();
up.setAction(ACTION.COMMIT, true, true); up.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
up.deleteByQuery("*:*"); up.deleteByQuery("*:*");
up.process(getSolrCore0()); up.process(getSolrCore0());
up.process(getSolrCore1()); up.process(getSolrCore1());
@ -119,11 +122,28 @@ public abstract class MergeIndexesExampleTestBase extends SolrExampleTestBase {
assertEquals(1, assertEquals(1,
getSolrCore1().query(new SolrQuery("id:BBB")).getResults().size()); getSolrCore1().query(new SolrQuery("id:BBB")).getResults().size());
return up;
}
public void testMergeIndexesByDirName() throws Exception {
UpdateRequest up = setupCores();
// Now get the index directory of core1 and merge with core0 // Now get the index directory of core1 and merge with core0
String indexDir = getIndexDirCore1(); CoreAdminRequest.mergeIndexes("core0", new String[] {getIndexDirCore1()}, new String[0], getSolrAdmin());
String name = "core0";
SolrServer coreadmin = getSolrAdmin(); // Now commit the merged index
CoreAdminRequest.mergeIndexes(name, new String[] { indexDir }, coreadmin); up.clear(); // just do commit
up.process(getSolrCore0());
assertEquals(1,
getSolrCore0().query(new SolrQuery("id:AAA")).getResults().size());
assertEquals(1,
getSolrCore0().query(new SolrQuery("id:BBB")).getResults().size());
}
public void testMergeIndexesByCoreName() throws Exception {
UpdateRequest up = setupCores();
CoreAdminRequest.mergeIndexes("core0", new String[0], new String[] {"core1"}, getSolrAdmin());
// Now commit the merged index // Now commit the merged index
up.clear(); // just do commit up.clear(); // just do commit