SOLR-6920, SOLR-6640: A replicated index can end up corrupted when small files end up with the same file name and size.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1657969 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2015-02-06 21:38:57 +00:00
parent bd3753e965
commit dfeca5ca16
4 changed files with 168 additions and 83 deletions

View File

@ -616,6 +616,9 @@ Bug Fixes
* olap.* in AnalyticsComponent * olap.* in AnalyticsComponent
(Alexandre Rafalovitch & hossman) (Alexandre Rafalovitch & hossman)
* SOLR-6920: A replicated index can end up corrupted when small files end up with the same
file name and size. (Varun Thacker, Mark Miller)
Optimizations Optimizations
---------------------- ----------------------

View File

@ -33,7 +33,6 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
@ -45,10 +44,13 @@ import java.util.zip.Checksum;
import java.util.zip.DeflaterOutputStream; import java.util.zip.DeflaterOutputStream;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy; import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexInput;
@ -425,27 +427,49 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
List<Map<String, Object>> result = new ArrayList<>(); List<Map<String, Object>> result = new ArrayList<>();
Directory dir = null; Directory dir = null;
try { try {
// get all the files in the commit
// use a set to workaround possible Lucene bug which returns same file
// name multiple times
Collection<String> files = new HashSet<>(commit.getFileNames());
dir = core.getDirectoryFactory().get(core.getNewIndexDir(), DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType); dir = core.getDirectoryFactory().get(core.getNewIndexDir(), DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
try { SegmentInfos infos = SegmentInfos.readCommit(dir, commit.getSegmentsFileName());
for (SegmentCommitInfo commitInfo : infos) {
for (String fileName : files) { for (String file : commitInfo.files()) {
if (fileName.endsWith(".lock")) continue;
Map<String,Object> fileMeta = new HashMap<>(); Map<String,Object> fileMeta = new HashMap<>();
fileMeta.put(NAME, fileName); fileMeta.put(NAME, file);
fileMeta.put(SIZE, dir.fileLength(fileName)); fileMeta.put(SIZE, dir.fileLength(file));
try (final IndexInput in = dir.openInput(file, IOContext.READONCE)) {
try {
long checksum = CodecUtil.retrieveChecksum(in);
fileMeta.put(CHECKSUM, checksum);
} catch(Exception e) {
LOG.warn("Could not read checksum from index file.", e);
}
}
result.add(fileMeta); result.add(fileMeta);
} }
} finally {
core.getDirectoryFactory().release(dir);
} }
// add the segments_N file
Map<String,Object> fileMeta = new HashMap<>();
fileMeta.put(NAME, infos.getSegmentsFileName());
fileMeta.put(SIZE, dir.fileLength(infos.getSegmentsFileName()));
if (infos.getId() != null) {
try (final IndexInput in = dir.openInput(infos.getSegmentsFileName(), IOContext.READONCE)) {
fileMeta.put(CHECKSUM, CodecUtil.retrieveChecksum(in));
}
}
result.add(fileMeta);
} catch (IOException e) { } catch (IOException e) {
rsp.add("status", "unable to get file names for given index generation"); rsp.add("status", "unable to get file names for given index generation");
rsp.add("exception", e); rsp.add("exception", e);
LOG.error("Unable to get file names for indexCommit generation: " + gen, e); LOG.error("Unable to get file names for indexCommit generation: " + gen, e);
} finally {
if (dir != null) {
try {
core.getDirectoryFactory().release(dir);
} catch (IOException e) {
SolrException.log(LOG, "Could not release directory after fetching file list", e);
}
}
} }
rsp.add(CMD_GET_FILE_LIST, result); rsp.add(CMD_GET_FILE_LIST, result);
if (confFileNameAlias.size() < 1 || core.getCoreDescriptor().getCoreContainer().isZooKeeperAware()) if (confFileNameAlias.size() < 1 || core.getCoreDescriptor().getCoreContainer().isZooKeeperAware())

View File

@ -16,43 +16,25 @@
*/ */
package org.apache.solr.handler; package org.apache.solr.handler;
import org.apache.commons.io.IOUtils; import static org.apache.solr.handler.ReplicationHandler.ALIAS;
import org.apache.http.client.HttpClient; import static org.apache.solr.handler.ReplicationHandler.CHECKSUM;
import org.apache.lucene.index.IndexCommit; import static org.apache.solr.handler.ReplicationHandler.CMD_DETAILS;
import org.apache.lucene.index.IndexWriter; import static org.apache.solr.handler.ReplicationHandler.CMD_GET_FILE;
import org.apache.lucene.index.SegmentCommitInfo; import static org.apache.solr.handler.ReplicationHandler.CMD_GET_FILE_LIST;
import org.apache.lucene.index.SegmentInfos; import static org.apache.solr.handler.ReplicationHandler.CMD_INDEX_VERSION;
import org.apache.lucene.store.Directory; import static org.apache.solr.handler.ReplicationHandler.COMMAND;
import org.apache.lucene.store.IOContext; import static org.apache.solr.handler.ReplicationHandler.COMPRESSION;
import org.apache.lucene.store.IndexInput; import static org.apache.solr.handler.ReplicationHandler.CONF_FILES;
import org.apache.lucene.store.IndexOutput; import static org.apache.solr.handler.ReplicationHandler.CONF_FILE_SHORT;
import org.apache.solr.client.solrj.SolrServerException; import static org.apache.solr.handler.ReplicationHandler.EXTERNAL;
import org.apache.solr.client.solrj.impl.HttpClientUtil; import static org.apache.solr.handler.ReplicationHandler.FILE;
import org.apache.solr.client.solrj.impl.HttpSolrClient; import static org.apache.solr.handler.ReplicationHandler.FILE_STREAM;
import org.apache.solr.client.solrj.request.QueryRequest; import static org.apache.solr.handler.ReplicationHandler.GENERATION;
import org.apache.solr.common.SolrException; import static org.apache.solr.handler.ReplicationHandler.INTERNAL;
import org.apache.solr.common.SolrException.ErrorCode; import static org.apache.solr.handler.ReplicationHandler.MASTER_URL;
import org.apache.solr.common.params.CommonParams; import static org.apache.solr.handler.ReplicationHandler.NAME;
import org.apache.solr.common.params.ModifiableSolrParams; import static org.apache.solr.handler.ReplicationHandler.OFFSET;
import org.apache.solr.common.util.ExecutorUtil; import static org.apache.solr.handler.ReplicationHandler.SIZE;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.ReplicationHandler.FileInfo;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.FileUtils;
import org.apache.solr.util.PropertiesInputStream;
import org.apache.solr.util.PropertiesOutputStream;
import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
@ -94,25 +76,43 @@ import java.util.zip.Adler32;
import java.util.zip.Checksum; import java.util.zip.Checksum;
import java.util.zip.InflaterInputStream; import java.util.zip.InflaterInputStream;
import static org.apache.solr.handler.ReplicationHandler.ALIAS; import org.apache.commons.io.IOUtils;
import static org.apache.solr.handler.ReplicationHandler.CHECKSUM; import org.apache.http.client.HttpClient;
import static org.apache.solr.handler.ReplicationHandler.CMD_DETAILS; import org.apache.lucene.codecs.CodecUtil;
import static org.apache.solr.handler.ReplicationHandler.CMD_GET_FILE; import org.apache.lucene.index.IndexCommit;
import static org.apache.solr.handler.ReplicationHandler.CMD_GET_FILE_LIST; import org.apache.lucene.index.IndexWriter;
import static org.apache.solr.handler.ReplicationHandler.CMD_INDEX_VERSION; import org.apache.lucene.index.SegmentInfos;
import static org.apache.solr.handler.ReplicationHandler.COMMAND; import org.apache.lucene.store.Directory;
import static org.apache.solr.handler.ReplicationHandler.COMPRESSION; import org.apache.lucene.store.IOContext;
import static org.apache.solr.handler.ReplicationHandler.CONF_FILES; import org.apache.lucene.store.IndexInput;
import static org.apache.solr.handler.ReplicationHandler.CONF_FILE_SHORT; import org.apache.lucene.store.IndexOutput;
import static org.apache.solr.handler.ReplicationHandler.EXTERNAL; import org.apache.solr.client.solrj.SolrServerException;
import static org.apache.solr.handler.ReplicationHandler.FILE; import org.apache.solr.client.solrj.impl.HttpClientUtil;
import static org.apache.solr.handler.ReplicationHandler.FILE_STREAM; import org.apache.solr.client.solrj.impl.HttpSolrClient;
import static org.apache.solr.handler.ReplicationHandler.GENERATION; import org.apache.solr.client.solrj.request.QueryRequest;
import static org.apache.solr.handler.ReplicationHandler.INTERNAL; import org.apache.solr.common.SolrException;
import static org.apache.solr.handler.ReplicationHandler.MASTER_URL; import org.apache.solr.common.SolrException.ErrorCode;
import static org.apache.solr.handler.ReplicationHandler.NAME; import org.apache.solr.common.params.CommonParams;
import static org.apache.solr.handler.ReplicationHandler.OFFSET; import org.apache.solr.common.params.ModifiableSolrParams;
import static org.apache.solr.handler.ReplicationHandler.SIZE; import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.ReplicationHandler.FileInfo;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.FileUtils;
import org.apache.solr.util.PropertiesInputStream;
import org.apache.solr.util.PropertiesOutputStream;
import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* <p/> Provides functionality of downloading changed index files as well as config files and a timer for scheduling fetches from the * <p/> Provides functionality of downloading changed index files as well as config files and a timer for scheduling fetches from the
@ -445,8 +445,7 @@ public class SnapPuller {
+ isFullCopyNeeded); + isFullCopyNeeded);
successfulInstall = false; successfulInstall = false;
downloadIndexFiles(isFullCopyNeeded, indexDir, tmpIndexDir, downloadIndexFiles(isFullCopyNeeded, indexDir, tmpIndexDir, latestGeneration);
latestGeneration);
LOG.info("Total time taken for download : " LOG.info("Total time taken for download : "
+ ((System.currentTimeMillis() - replicationStartTime) / 1000) + ((System.currentTimeMillis() - replicationStartTime) / 1000)
+ " secs"); + " secs");
@ -796,15 +795,17 @@ public class SnapPuller {
* @param indexDir the indexDir to be merged to * @param indexDir the indexDir to be merged to
* @param latestGeneration the version number * @param latestGeneration the version number
*/ */
private void downloadIndexFiles(boolean downloadCompleteIndex, private void downloadIndexFiles(boolean downloadCompleteIndex, Directory indexDir, Directory tmpIndexDir, long latestGeneration)
Directory indexDir, Directory tmpIndexDir, long latestGeneration)
throws Exception { throws Exception {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Download files to dir: " + Arrays.asList(indexDir.listAll())); LOG.debug("Download files to dir: " + Arrays.asList(indexDir.listAll()));
} }
for (Map<String,Object> file : filesToDownload) { for (Map<String,Object> file : filesToDownload) {
if (!slowFileExists(indexDir, (String) file.get(NAME)) String filename = (String) file.get(NAME);
|| downloadCompleteIndex) { CompareResult compareResult = compareFile(indexDir, filename, (Long) file.get(SIZE), (Long) file.get(CHECKSUM));
if (!compareResult.equal || downloadCompleteIndex
|| (!compareResult.checkSummed && (filename.endsWith(".si") || filename.endsWith(".liv")
|| filename.startsWith("segments_")))) {
dirFileFetcher = new DirectoryFileFetcher(tmpIndexDir, file, dirFileFetcher = new DirectoryFileFetcher(tmpIndexDir, file,
(String) file.get(NAME), false, latestGeneration); (String) file.get(NAME), false, latestGeneration);
currentFile = file; currentFile = file;
@ -817,6 +818,54 @@ public class SnapPuller {
} }
} }
static class CompareResult {
boolean equal = false;
boolean checkSummed = false;
}
private CompareResult compareFile(Directory indexDir, String filename, Long backupIndexFileLen, Long backupIndexFileChecksum) {
CompareResult compareResult = new CompareResult();
try {
try (final IndexInput indexInput = indexDir.openInput(filename, IOContext.READONCE)) {
long indexFileLen = indexInput.length();
long indexFileChecksum = 0;
try {
indexFileChecksum = CodecUtil.retrieveChecksum(indexInput);
compareResult.checkSummed = true;
} catch (Exception e) {
LOG.warn("Could not retrieve checksum from file.", e);
if (indexFileLen == backupIndexFileLen) {
compareResult.equal = true;
return compareResult;
} else {
LOG.warn("File {} did not match. expected checksum is {} and actual is checksum {}. " +
"expected length is {} and actual length is {}", filename, backupIndexFileChecksum, indexFileChecksum,
backupIndexFileLen, indexFileLen);
compareResult.equal = false;
return compareResult;
}
}
if (indexFileLen == backupIndexFileLen && indexFileChecksum == backupIndexFileChecksum) {
compareResult.equal = true;
return compareResult;
} else {
LOG.warn("File {} did not match. expected checksum is {} and actual is checksum {}. " +
"expected length is {} and actual length is {}", filename, backupIndexFileChecksum, indexFileChecksum,
backupIndexFileLen, indexFileLen);
compareResult.equal = false;
return compareResult;
}
}
} catch (IOException e) {
LOG.error("Could not read file " + filename + ". Downloading it again", e);
compareResult.equal = false;
return compareResult;
}
}
/** Returns true if the file exists (can be opened), false /** Returns true if the file exists (can be opened), false
* if it cannot be opened, and (unlike Java's * if it cannot be opened, and (unlike Java's
* File.exists) throws IOException if there's some * File.exists) throws IOException if there's some
@ -839,14 +888,23 @@ public class SnapPuller {
*/ */
private boolean isIndexStale(Directory dir) throws IOException { private boolean isIndexStale(Directory dir) throws IOException {
for (Map<String, Object> file : filesToDownload) { for (Map<String, Object> file : filesToDownload) {
if (slowFileExists(dir, (String) file.get(NAME)) String filename = (String) file.get(NAME);
&& dir.fileLength((String) file.get(NAME)) != (Long) file.get(SIZE)) { Long length = (Long) file.get(SIZE);
LOG.warn("File " + file.get(NAME) + " expected to be " + file.get(SIZE) Long checksum = (Long) file.get(CHECKSUM);
+ " while it is " + dir.fileLength((String) file.get(NAME))); if (slowFileExists(dir, filename)) {
// file exists and size is different, therefore we must assume if (checksum != null) {
// corrupted index if (!(compareFile(dir, filename, length, checksum).equal)) {
// file exists and size or checksum is different, therefore we must download it again
return true; return true;
} }
} else {
if (length != dir.fileLength(filename)) {
LOG.warn("File {} did not match. expected length is {} and actual length is {}",
filename, length, dir.fileLength(filename));
return true;
}
}
}
} }
return false; return false;
} }

View File

@ -138,7 +138,7 @@ public class HdfsTestUtil {
String dir = uri.toString() String dir = uri.toString()
+ "/" + "/"
+ new File(dataDir).toString().replaceAll(":", "_") + new File(dataDir).toString().replaceAll(":", "_")
.replaceAll("/", "_"); .replaceAll("/", "_").replaceAll(" ", "_");
return dir; return dir;
} }