SOLR-5397: Replication can fail silently in some cases.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1542884 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2013-11-18 04:00:56 +00:00
parent 6e78f9a497
commit e8fbefc83d
4 changed files with 114 additions and 103 deletions

View File

@ -226,6 +226,8 @@ Bug Fixes
unloaded results in a " Too many close [count:-1]" error.
(Olivier Soyez via Erick Erickson)
* SOLR-5397: Replication can fail silently in some cases. (Mark Miller)
Optimizations
----------------------

View File

@ -19,12 +19,14 @@ package org.apache.solr.cloud;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
@ -41,6 +43,7 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.core.RequestHandlers.LazyRequestHandlerWrapper;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.ReplicationHandler;
@ -161,6 +164,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
RefCounted<SolrIndexSearcher> searchHolder = core
.getNewestSearcher(false);
SolrIndexSearcher searcher = searchHolder.get();
Directory dir = core.getDirectoryFactory().get(core.getIndexDir(), DirContext.META_DATA, null);
try {
log.debug(core.getCoreDescriptor().getCoreContainer()
.getZkController().getNodeName()
@ -170,8 +174,12 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
+ leaderUrl
+ " gen:"
+ core.getDeletionPolicy().getLatestCommit().getGeneration()
+ " data:" + core.getDataDir());
+ " data:" + core.getDataDir()
+ " index:" + core.getIndexDir()
+ " newIndex:" + core.getNewIndexDir()
+ " files:" + Arrays.asList(dir.listAll()));
} finally {
core.getDirectoryFactory().release(dir);
searchHolder.decref();
}
} catch (Exception e) {

View File

@ -16,6 +16,7 @@
*/
package org.apache.solr.handler;
import static org.apache.lucene.util.IOUtils.CHARSET_UTF_8;
import static org.apache.solr.handler.ReplicationHandler.ALIAS;
import static org.apache.solr.handler.ReplicationHandler.CHECKSUM;
import static org.apache.solr.handler.ReplicationHandler.CMD_DETAILS;
@ -47,6 +48,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
@ -75,9 +77,6 @@ import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import static org.apache.lucene.util.IOUtils.CHARSET_UTF_8;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
@ -104,6 +103,7 @@ import org.apache.solr.util.FileUtils;
import org.apache.solr.util.PropertiesInputStream;
import org.apache.solr.util.PropertiesOutputStream;
import org.apache.solr.util.RefCounted;
import org.eclipse.jetty.util.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -388,8 +388,8 @@ public class SnapPuller {
fsyncService = Executors.newSingleThreadExecutor(new DefaultSolrThreadFactory("fsyncService"));
// use a synchronized list because the list is read by other threads (to show details)
filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
// if the generateion of master is older than that of the slave , it means they are not compatible to be copied
// then a new index direcory to be created and all the files need to be copied
// if the generation of master is older than that of the slave , it means they are not compatible to be copied
// then a new index directory to be created and all the files need to be copied
boolean isFullCopyNeeded = IndexDeletionPolicyWrapper
.getCommitTimestamp(commit) >= latestVersion
|| commit.getGeneration() >= latestGeneration || forceReplication;
@ -408,57 +408,66 @@ public class SnapPuller {
if (isIndexStale(indexDir)) {
isFullCopyNeeded = true;
}
LOG.info("Starting download to " + tmpIndexDir + " fullCopy=" + isFullCopyNeeded);
successfulInstall = false;
downloadIndexFiles(isFullCopyNeeded, tmpIndexDir, latestGeneration);
LOG.info("Total time taken for download : " + ((System.currentTimeMillis() - replicationStartTime) / 1000) + " secs");
Collection<Map<String, Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload);
if (!modifiedConfFiles.isEmpty()) {
downloadConfFiles(confFilesToDownload, latestGeneration);
if (isFullCopyNeeded) {
successfulInstall = modifyIndexProps(tmpIdxDirName);
deleteTmpIdxDir = false;
} else {
solrCore.getUpdateHandler().getSolrCoreState()
.closeIndexWriter(core, true);
try {
successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
} finally {
solrCore.getUpdateHandler().getSolrCoreState()
.openIndexWriter(core);
}
}
if (successfulInstall) {
if (!isFullCopyNeeded) {
// rollback - and do it before we download any files
// so we don't remove files we thought we didn't need
// to download later
solrCore.getUpdateHandler().getSolrCoreState()
.closeIndexWriter(core, true);
}
try {
LOG.info("Starting download to " + tmpIndexDir + " fullCopy="
+ isFullCopyNeeded);
successfulInstall = false;
downloadIndexFiles(isFullCopyNeeded, indexDir, tmpIndexDir,
latestGeneration);
LOG.info("Total time taken for download : "
+ ((System.currentTimeMillis() - replicationStartTime) / 1000)
+ " secs");
Collection<Map<String,Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload);
if (!modifiedConfFiles.isEmpty()) {
downloadConfFiles(confFilesToDownload, latestGeneration);
if (isFullCopyNeeded) {
// let the system know we are changing dir's and the old one
// may be closed
if (indexDir != null) {
LOG.info("removing old index directory " + indexDir);
core.getDirectoryFactory().doneWithDirectory(indexDir);
core.getDirectoryFactory().remove(indexDir);
}
}
LOG.info("Configuration files are modified, core will be reloaded");
logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);//write to a file time of replication and conf files.
reloadCore();
}
} else {
terminateAndWaitFsyncService();
if (isFullCopyNeeded) {
successfulInstall = modifyIndexProps(tmpIdxDirName);
deleteTmpIdxDir = false;
} else {
solrCore.getUpdateHandler().getSolrCoreState().closeIndexWriter(core, true);
try {
successfulInstall = modifyIndexProps(tmpIdxDirName);
deleteTmpIdxDir = false;
} else {
successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
} finally {
solrCore.getUpdateHandler().getSolrCoreState().openIndexWriter(core);
}
if (successfulInstall) {
if (isFullCopyNeeded) {
// let the system know we are changing dir's and the old one
// may be closed
if (indexDir != null) {
LOG.info("removing old index directory " + indexDir);
core.getDirectoryFactory().doneWithDirectory(indexDir);
core.getDirectoryFactory().remove(indexDir);
}
}
LOG.info("Configuration files are modified, core will be reloaded");
logReplicationTimeAndConfFiles(modifiedConfFiles,
successfulInstall);// write to a file time of replication and
// conf files.
reloadCore();
}
} else {
terminateAndWaitFsyncService();
if (isFullCopyNeeded) {
successfulInstall = modifyIndexProps(tmpIdxDirName);
deleteTmpIdxDir = false;
} else {
successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
}
if (successfulInstall) {
logReplicationTimeAndConfFiles(modifiedConfFiles,
successfulInstall);
}
}
if (successfulInstall) {
logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);
} finally {
if (!isFullCopyNeeded) {
solrCore.getUpdateHandler().getSolrCoreState().openIndexWriter(core);
}
}
@ -732,29 +741,28 @@ public class SnapPuller {
* Download the index files. If a new index is needed, download all the files.
*
* @param downloadCompleteIndex is it a fresh index copy
* @param tmpIndexDir the directory to which files need to be downloadeed to
* @param tmpIndexDir the directory to which files need to be downloadeed to
* @param indexDir the indexDir to be merged to
* @param latestGeneration the version number
*/
private void downloadIndexFiles(boolean downloadCompleteIndex,
Directory tmpIndexDir, long latestGeneration) throws Exception {
String indexDir = solrCore.getIndexDir();
// it's okay to use null for lock factory since we know this dir will exist
Directory dir = solrCore.getDirectoryFactory().get(indexDir, DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
try {
for (Map<String,Object> file : filesToDownload) {
if (!dir.fileExists((String) file.get(NAME)) || downloadCompleteIndex) {
dirFileFetcher = new DirectoryFileFetcher(tmpIndexDir, file,
(String) file.get(NAME), false, latestGeneration);
currentFile = file;
dirFileFetcher.fetchFile();
filesDownloaded.add(new HashMap<String,Object>(file));
} else {
LOG.info("Skipping download for " + file.get(NAME) + " because it already exists");
}
Directory indexDir, Directory tmpIndexDir, long latestGeneration)
throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("Download files to dir: " + Arrays.asList(indexDir.listAll()));
}
for (Map<String,Object> file : filesToDownload) {
if (!indexDir.fileExists((String) file.get(NAME))
|| downloadCompleteIndex) {
dirFileFetcher = new DirectoryFileFetcher(tmpIndexDir, file,
(String) file.get(NAME), false, latestGeneration);
currentFile = file;
dirFileFetcher.fetchFile();
filesDownloaded.add(new HashMap<String,Object>(file));
} else {
LOG.info("Skipping download for " + file.get(NAME)
+ " because it already exists");
}
} finally {
solrCore.getDirectoryFactory().release(dir);
}
}
@ -782,6 +790,7 @@ public class SnapPuller {
* <p/>
*/
private boolean moveAFile(Directory tmpIdxDir, Directory indexDir, String fname, List<String> copiedfiles) {
LOG.debug("Moving file: {}", fname);
boolean success = false;
try {
if (indexDir.fileExists(fname)) {
@ -805,6 +814,14 @@ public class SnapPuller {
* Copy all index files from the temp index dir to the actual index. The segments_N file is copied last.
*/
private boolean moveIndexFiles(Directory tmpIdxDir, Directory indexDir) {
if (LOG.isDebugEnabled()) {
try {
LOG.info("From dir files:" + Arrays.asList(tmpIdxDir.listAll()));
LOG.info("To dir files:" + Arrays.asList(indexDir.listAll()));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
String segmentsFile = null;
List<String> movedfiles = new ArrayList<String>();
for (Map<String, Object> f : filesDownloaded) {

View File

@ -17,10 +17,24 @@ package org.apache.solr.update.processor;
* limitations under the License.
*/
import org.apache.http.client.HttpClient;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRef;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
import org.apache.solr.cloud.CloudDescriptor;
@ -51,10 +65,7 @@ import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.Hash;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.handler.component.RealTimeGetComponent;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.SolrQueryResponse;
@ -78,22 +89,6 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
// NOT mt-safe... create a new processor for each add thread
// TODO: we really should not wait for distrib after local? unless a certain replication factor is asked for
public class DistributedUpdateProcessor extends UpdateRequestProcessor {
@ -122,17 +117,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
}
private final HttpClient client;
{
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 10000);
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 20);
params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, 15000);
params.set(HttpClientUtil.PROP_SO_TIMEOUT, 60000);
params.set(HttpClientUtil.PROP_USE_RETRY, false);
client = HttpClientUtil.createClient(params);
}
public static final String COMMIT_END_POINT = "commit_end_point";
public static final String LOG_REPLAY = "log_replay";