mirror of https://github.com/apache/lucene.git
SOLR-10120: A SolrCore reload can remove the index from the previous SolrCore during replication index rollover.
SOLR-10124: Replication can skip removing a temporary index directory in some cases when it should not. SOLR-10119: Harden TestReplicationHandler.
This commit is contained in:
parent
11a54aa5a9
commit
2196663156
|
@ -155,6 +155,10 @@ Bug Fixes
|
|||
|
||||
* SOLR-10083: Fix instanceof check in ConstDoubleSource.equals (Pushkar Raste via Christine Poerschke)
|
||||
|
||||
* SOLR-10120: A SolrCore reload can remove the index from the previous SolrCore during replication index rollover. (Mark Miller)
|
||||
|
||||
* SOLR-10124: Replication can skip removing a temporary index directory in some cases when it should not. (Mark Miller)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -23,8 +23,10 @@ import java.io.FileNotFoundException;
|
|||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.lucene.store.Directory;
|
||||
|
@ -326,7 +328,7 @@ public abstract class DirectoryFactory implements NamedListInitializedPlugin,
|
|||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
public void cleanupOldIndexDirectories(final String dataDirPath, final String currentIndexDirPath) {
|
||||
public void cleanupOldIndexDirectories(final String dataDirPath, final String currentIndexDirPath, boolean afterCoreReload) {
|
||||
File dataDir = new File(dataDirPath);
|
||||
if (!dataDir.isDirectory()) {
|
||||
log.debug("{} does not point to a valid data directory; skipping clean-up of old index directories.", dataDirPath);
|
||||
|
@ -347,9 +349,17 @@ public abstract class DirectoryFactory implements NamedListInitializedPlugin,
|
|||
if (oldIndexDirs == null || oldIndexDirs.length == 0)
|
||||
return; // nothing to do (no log message needed)
|
||||
|
||||
log.info("Found {} old index directories to clean-up under {}", oldIndexDirs.length, dataDirPath);
|
||||
for (File dir : oldIndexDirs) {
|
||||
|
||||
List<File> dirsList = Arrays.asList(oldIndexDirs);
|
||||
Collections.sort(dirsList, Collections.reverseOrder());
|
||||
|
||||
int i = 0;
|
||||
if (afterCoreReload) {
|
||||
log.info("Will not remove most recent old directory after reload {}", oldIndexDirs[0]);
|
||||
i = 1;
|
||||
}
|
||||
log.info("Found {} old index directories to clean-up under {} afterReload={}", oldIndexDirs.length - i, dataDirPath, afterCoreReload);
|
||||
for (; i < dirsList.size(); i++) {
|
||||
File dir = dirsList.get(i);
|
||||
String dirToRmPath = dir.getAbsolutePath();
|
||||
try {
|
||||
if (deleteOldIndexDirectory(dirToRmPath)) {
|
||||
|
|
|
@ -16,14 +16,18 @@
|
|||
*/
|
||||
package org.apache.solr.core;
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Directory provider for implementations that do not persist over reboots.
|
||||
*
|
||||
*/
|
||||
public abstract class EphemeralDirectoryFactory extends CachingDirectoryFactory {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
@Override
|
||||
public boolean exists(String path) throws IOException {
|
||||
|
@ -61,5 +65,9 @@ public abstract class EphemeralDirectoryFactory extends CachingDirectoryFactory
|
|||
public void remove(String path) throws IOException {
|
||||
// ram dir does not persist its dir anywhere
|
||||
}
|
||||
|
||||
public void cleanupOldIndexDirectories(final String dataDirPath, final String currentIndexDirPath, boolean reload) {
|
||||
// currently a no-op
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -21,8 +21,11 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY
|
|||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.net.URLEncoder;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
@ -505,7 +508,7 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory implements Sol
|
|||
}
|
||||
|
||||
@Override
|
||||
public void cleanupOldIndexDirectories(final String dataDir, final String currentIndexDir) {
|
||||
public void cleanupOldIndexDirectories(final String dataDir, final String currentIndexDir, boolean afterReload) {
|
||||
|
||||
// Get the FileSystem object
|
||||
final Path dataDirPath = new Path(dataDir);
|
||||
|
@ -549,13 +552,27 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory implements Sol
|
|||
} catch (IOException ioExc) {
|
||||
LOG.error("Error checking for old index directories to clean-up.", ioExc);
|
||||
}
|
||||
|
||||
List<Path> oldIndexPaths = new ArrayList<>(oldIndexDirs.length);
|
||||
for (FileStatus ofs : oldIndexDirs) {
|
||||
oldIndexPaths.add(ofs.getPath());
|
||||
}
|
||||
|
||||
if (oldIndexDirs == null || oldIndexDirs.length == 0)
|
||||
return; // nothing to clean-up
|
||||
|
||||
Collections.sort(oldIndexPaths, Collections.reverseOrder());
|
||||
|
||||
Set<String> livePaths = getLivePaths();
|
||||
for (FileStatus oldDir : oldIndexDirs) {
|
||||
Path oldDirPath = oldDir.getPath();
|
||||
|
||||
int i = 0;
|
||||
if (afterReload) {
|
||||
LOG.info("Will not remove most recent old directory on reload {}", oldIndexDirs[0]);
|
||||
i = 1;
|
||||
}
|
||||
LOG.info("Found {} old index directories to clean-up under {} afterReload={}", oldIndexDirs.length - i, dataDirPath, afterReload);
|
||||
for (; i < oldIndexPaths.size(); i++) {
|
||||
Path oldDirPath = oldIndexPaths.get(i);
|
||||
if (livePaths.contains(oldDirPath.toString())) {
|
||||
LOG.warn("Cannot delete directory {} because it is still being referenced in the cache.", oldDirPath);
|
||||
} else {
|
||||
|
|
|
@ -169,8 +169,8 @@ public class MetricsDirectoryFactory extends DirectoryFactory implements SolrCor
|
|||
}
|
||||
|
||||
@Override
|
||||
public void cleanupOldIndexDirectories(String dataDirPath, String currentIndexDirPath) {
|
||||
in.cleanupOldIndexDirectories(dataDirPath, currentIndexDirPath);
|
||||
public void cleanupOldIndexDirectories(String dataDirPath, String currentIndexDirPath, boolean reload) {
|
||||
in.cleanupOldIndexDirectories(dataDirPath, currentIndexDirPath, reload);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -617,34 +617,37 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
|||
}
|
||||
|
||||
public SolrCore reload(ConfigSet coreConfig) throws IOException {
|
||||
solrCoreState.increfSolrCoreState();
|
||||
final SolrCore currentCore;
|
||||
if (!getNewIndexDir().equals(getIndexDir())) {
|
||||
// the directory is changing, don't pass on state
|
||||
currentCore = null;
|
||||
} else {
|
||||
currentCore = this;
|
||||
}
|
||||
// only one reload at a time
|
||||
synchronized (getUpdateHandler().getSolrCoreState().getReloadLock()) {
|
||||
solrCoreState.increfSolrCoreState();
|
||||
final SolrCore currentCore;
|
||||
if (!getNewIndexDir().equals(getIndexDir())) {
|
||||
// the directory is changing, don't pass on state
|
||||
currentCore = null;
|
||||
} else {
|
||||
currentCore = this;
|
||||
}
|
||||
|
||||
boolean success = false;
|
||||
SolrCore core = null;
|
||||
try {
|
||||
CoreDescriptor cd = new CoreDescriptor(coreDescriptor.getName(), coreDescriptor);
|
||||
cd.loadExtraProperties(); //Reload the extra properties
|
||||
core = new SolrCore(getName(), getDataDir(), coreConfig.getSolrConfig(),
|
||||
coreConfig.getIndexSchema(), coreConfig.getProperties(),
|
||||
cd, updateHandler, solrDelPolicy, currentCore);
|
||||
|
||||
// we open a new IndexWriter to pick up the latest config
|
||||
core.getUpdateHandler().getSolrCoreState().newIndexWriter(core, false);
|
||||
|
||||
core.getSearcher(true, false, null, true);
|
||||
success = true;
|
||||
return core;
|
||||
} finally {
|
||||
// close the new core on any errors that have occurred.
|
||||
if (!success) {
|
||||
IOUtils.closeQuietly(core);
|
||||
boolean success = false;
|
||||
SolrCore core = null;
|
||||
try {
|
||||
CoreDescriptor cd = new CoreDescriptor(coreDescriptor.getName(), coreDescriptor);
|
||||
cd.loadExtraProperties(); //Reload the extra properties
|
||||
core = new SolrCore(getName(), getDataDir(), coreConfig.getSolrConfig(),
|
||||
coreConfig.getIndexSchema(), coreConfig.getProperties(),
|
||||
cd, updateHandler, solrDelPolicy, currentCore, true);
|
||||
|
||||
// we open a new IndexWriter to pick up the latest config
|
||||
core.getUpdateHandler().getSolrCoreState().newIndexWriter(core, false);
|
||||
|
||||
core.getSearcher(true, false, null, true);
|
||||
success = true;
|
||||
return core;
|
||||
} finally {
|
||||
// close the new core on any errors that have occurred.
|
||||
if (!success) {
|
||||
IOUtils.closeQuietly(core);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -686,7 +689,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
void initIndex(boolean reload) throws IOException {
|
||||
void initIndex(boolean passOnPreviousState, boolean reload) throws IOException {
|
||||
|
||||
String indexDir = getNewIndexDir();
|
||||
boolean indexExists = getDirectoryFactory().exists(indexDir);
|
||||
|
@ -697,7 +700,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
|||
|
||||
initIndexReaderFactory();
|
||||
|
||||
if (indexExists && firstTime && !reload) {
|
||||
if (indexExists && firstTime && !passOnPreviousState) {
|
||||
final String lockType = getSolrConfig().indexConfig.lockType;
|
||||
Directory dir = directoryFactory.get(indexDir, DirContext.DEFAULT, lockType);
|
||||
try {
|
||||
|
@ -726,7 +729,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
|||
}
|
||||
|
||||
|
||||
cleanupOldIndexDirectories();
|
||||
cleanupOldIndexDirectories(reload);
|
||||
}
|
||||
|
||||
|
||||
|
@ -823,7 +826,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
|||
|
||||
public SolrCore(CoreDescriptor cd, ConfigSet coreConfig) {
|
||||
this(cd.getName(), null, coreConfig.getSolrConfig(), coreConfig.getIndexSchema(), coreConfig.getProperties(),
|
||||
cd, null, null, null);
|
||||
cd, null, null, null, false);
|
||||
}
|
||||
|
||||
|
||||
|
@ -843,7 +846,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
|||
public SolrCore(String name, String dataDir, SolrConfig config,
|
||||
IndexSchema schema, NamedList configSetProperties,
|
||||
CoreDescriptor coreDescriptor, UpdateHandler updateHandler,
|
||||
IndexDeletionPolicyWrapper delPolicy, SolrCore prev) {
|
||||
IndexDeletionPolicyWrapper delPolicy, SolrCore prev, boolean reload) {
|
||||
|
||||
assert ObjectReleaseTracker.track(searcherExecutor); // ensure that in unclean shutdown tests we still close this
|
||||
|
||||
|
@ -905,7 +908,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
|||
this.codec = initCodec(solrConfig, this.schema);
|
||||
|
||||
memClassLoader = new MemClassLoader(PluginBag.RuntimeLib.getLibObjects(this, solrConfig.getPluginInfos(PluginBag.RuntimeLib.class.getName())), getResourceLoader());
|
||||
initIndex(prev != null);
|
||||
initIndex(prev != null, reload);
|
||||
|
||||
initWriters();
|
||||
qParserPlugins.init(createInstances(QParserPlugin.standardPlugins), this);
|
||||
|
@ -1533,7 +1536,12 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
|||
}
|
||||
|
||||
if (coreStateClosed) {
|
||||
|
||||
try {
|
||||
cleanupOldIndexDirectories(false);
|
||||
} catch (Exception e) {
|
||||
SolrException.log(log, e);
|
||||
}
|
||||
|
||||
try {
|
||||
directoryFactory.close();
|
||||
} catch (Throwable e) {
|
||||
|
@ -1542,7 +1550,6 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
|||
throw (Error) e;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if( closeHooks != null ) {
|
||||
|
@ -1557,6 +1564,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert ObjectReleaseTracker.release(this);
|
||||
}
|
||||
|
||||
|
@ -2952,16 +2960,16 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
|||
return false;
|
||||
}
|
||||
|
||||
public void cleanupOldIndexDirectories() {
|
||||
public void cleanupOldIndexDirectories(boolean reload) {
|
||||
final DirectoryFactory myDirFactory = getDirectoryFactory();
|
||||
final String myDataDir = getDataDir();
|
||||
final String myIndexDir = getIndexDir();
|
||||
final String myIndexDir = getNewIndexDir(); // ensure the latest replicated index is protected
|
||||
final String coreName = getName();
|
||||
if (myDirFactory != null && myDataDir != null && myIndexDir != null) {
|
||||
Thread cleanupThread = new Thread(() -> {
|
||||
log.debug("Looking for old index directories to cleanup for core {} in {}", coreName, myDataDir);
|
||||
try {
|
||||
myDirFactory.cleanupOldIndexDirectories(myDataDir, myIndexDir);
|
||||
myDirFactory.cleanupOldIndexDirectories(myDataDir, myIndexDir, reload);
|
||||
} catch (Exception exc) {
|
||||
log.error("Failed to cleanup old index directories for core "+coreName, exc);
|
||||
}
|
||||
|
|
|
@ -54,7 +54,6 @@ import java.util.zip.Adler32;
|
|||
import java.util.zip.Checksum;
|
||||
import java.util.zip.InflaterInputStream;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.http.client.HttpClient;
|
||||
import org.apache.lucene.codecs.CodecUtil;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
|
@ -75,6 +74,7 @@ import org.apache.solr.common.params.CommonParams;
|
|||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.ExecutorUtil;
|
||||
import org.apache.solr.common.util.FastInputStream;
|
||||
import org.apache.solr.common.util.IOUtils;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.SuppressForbidden;
|
||||
import org.apache.solr.core.DirectoryFactory;
|
||||
|
@ -182,7 +182,13 @@ public class IndexFetcher {
|
|||
useInternalCompression = INTERNAL.equals(compress);
|
||||
useExternalCompression = EXTERNAL.equals(compress);
|
||||
connTimeout = getParameter(initArgs, HttpClientUtil.PROP_CONNECTION_TIMEOUT, 30000, null);
|
||||
soTimeout = getParameter(initArgs, HttpClientUtil.PROP_SO_TIMEOUT, 120000, null);
|
||||
|
||||
// allow a master override for tests - you specify this in /replication slave section of solrconfig and some
|
||||
// test don't want to define this
|
||||
soTimeout = Integer.getInteger("solr.indexfetcher.sotimeout", -1);
|
||||
if (soTimeout == -1) {
|
||||
soTimeout = getParameter(initArgs, HttpClientUtil.PROP_SO_TIMEOUT, 120000, null);
|
||||
}
|
||||
|
||||
String httpBasicAuthUser = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_USER);
|
||||
String httpBasicAuthPassword = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_PASS);
|
||||
|
@ -325,6 +331,7 @@ public class IndexFetcher {
|
|||
}
|
||||
|
||||
LOG.info("Slave's generation: " + commit.getGeneration());
|
||||
LOG.info("Slave's version: " + IndexDeletionPolicyWrapper.getCommitTimestamp(commit));
|
||||
|
||||
if (latestVersion == 0L) {
|
||||
if (forceReplication && commit.getGeneration() != 0) {
|
||||
|
@ -459,7 +466,7 @@ public class IndexFetcher {
|
|||
downloadConfFiles(confFilesToDownload, latestGeneration);
|
||||
if (isFullCopyNeeded) {
|
||||
successfulInstall = solrCore.modifyIndexProps(tmpIdxDirName);
|
||||
deleteTmpIdxDir = false;
|
||||
if (successfulInstall) deleteTmpIdxDir = false;
|
||||
} else {
|
||||
successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
|
||||
}
|
||||
|
@ -487,7 +494,7 @@ public class IndexFetcher {
|
|||
terminateAndWaitFsyncService();
|
||||
if (isFullCopyNeeded) {
|
||||
successfulInstall = solrCore.modifyIndexProps(tmpIdxDirName);
|
||||
deleteTmpIdxDir = false;
|
||||
if (successfulInstall) deleteTmpIdxDir = false;
|
||||
} else {
|
||||
successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
|
||||
}
|
||||
|
@ -565,7 +572,8 @@ public class IndexFetcher {
|
|||
try {
|
||||
logReplicationTimeAndConfFiles(null, successfulInstall);
|
||||
} catch (Exception e) {
|
||||
LOG.error("caught", e);
|
||||
// this can happen on shutdown, a fetch may be running in a thread after DirectoryFactory is closed
|
||||
LOG.warn("Could not log failed replication details", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -583,25 +591,32 @@ public class IndexFetcher {
|
|||
stop = false;
|
||||
fsyncException = null;
|
||||
} finally {
|
||||
if (deleteTmpIdxDir && tmpIndexDir != null) {
|
||||
try {
|
||||
// order below is important
|
||||
try {
|
||||
if (tmpIndexDir != null && deleteTmpIdxDir) {
|
||||
core.getDirectoryFactory().doneWithDirectory(tmpIndexDir);
|
||||
core.getDirectoryFactory().remove(tmpIndexDir);
|
||||
} catch (IOException e) {
|
||||
SolrException.log(LOG, "Error removing directory " + tmpIndexDir, e);
|
||||
}
|
||||
}
|
||||
|
||||
if (tmpIndexDir != null) {
|
||||
core.getDirectoryFactory().release(tmpIndexDir);
|
||||
}
|
||||
|
||||
if (indexDir != null) {
|
||||
core.getDirectoryFactory().release(indexDir);
|
||||
}
|
||||
|
||||
if (tmpTlogDir != null) {
|
||||
delTree(tmpTlogDir);
|
||||
} catch (Exception e) {
|
||||
SolrException.log(LOG, e);
|
||||
} finally {
|
||||
try {
|
||||
if (tmpIndexDir != null) core.getDirectoryFactory().release(tmpIndexDir);
|
||||
} catch (Exception e) {
|
||||
SolrException.log(LOG, e);
|
||||
}
|
||||
try {
|
||||
if (indexDir != null) {
|
||||
core.getDirectoryFactory().release(indexDir);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
SolrException.log(LOG, e);
|
||||
}
|
||||
try {
|
||||
if (tmpTlogDir != null) delTree(tmpTlogDir);
|
||||
} catch (Exception e) {
|
||||
SolrException.log(LOG, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -863,8 +878,9 @@ public class IndexFetcher {
|
|||
String filename = (String) file.get(NAME);
|
||||
long size = (Long) file.get(SIZE);
|
||||
CompareResult compareResult = compareFile(indexDir, filename, size, (Long) file.get(CHECKSUM));
|
||||
if (!compareResult.equal || downloadCompleteIndex
|
||||
|| filesToAlwaysDownloadIfNoChecksums(filename, size, compareResult)) {
|
||||
boolean alwaysDownload = filesToAlwaysDownloadIfNoChecksums(filename, size, compareResult);
|
||||
LOG.debug("Downloading file={} size={} checksum={} alwaysDownload={}", filename, size, file.get(CHECKSUM), alwaysDownload);
|
||||
if (!compareResult.equal || downloadCompleteIndex || alwaysDownload) {
|
||||
dirFileFetcher = new DirectoryFileFetcher(tmpIndexDir, file,
|
||||
(String) file.get(NAME), FILE, latestGeneration);
|
||||
currentFile = file;
|
||||
|
@ -915,7 +931,7 @@ public class IndexFetcher {
|
|||
compareResult.equal = true;
|
||||
return compareResult;
|
||||
} else {
|
||||
LOG.warn(
|
||||
LOG.info(
|
||||
"File {} did not match. expected length is {} and actual length is {}", filename, backupIndexFileLen, indexFileLen);
|
||||
compareResult.equal = false;
|
||||
return compareResult;
|
||||
|
@ -1349,15 +1365,15 @@ public class IndexFetcher {
|
|||
private class FileFetcher {
|
||||
private final FileInterface file;
|
||||
private boolean includeChecksum = true;
|
||||
private String fileName;
|
||||
private String saveAs;
|
||||
private String solrParamOutput;
|
||||
private Long indexGen;
|
||||
private final String fileName;
|
||||
private final String saveAs;
|
||||
private final String solrParamOutput;
|
||||
private final Long indexGen;
|
||||
|
||||
private long size;
|
||||
private final long size;
|
||||
private long bytesDownloaded = 0;
|
||||
private byte[] buf = new byte[1024 * 1024];
|
||||
private Checksum checksum;
|
||||
private final Checksum checksum;
|
||||
private int errorCount = 0;
|
||||
private boolean aborted = false;
|
||||
|
||||
|
@ -1369,8 +1385,11 @@ public class IndexFetcher {
|
|||
this.solrParamOutput = solrParamOutput;
|
||||
this.saveAs = saveAs;
|
||||
indexGen = latestGen;
|
||||
if (includeChecksum)
|
||||
if (includeChecksum) {
|
||||
checksum = new Adler32();
|
||||
} else {
|
||||
checksum = null;
|
||||
}
|
||||
}
|
||||
|
||||
public long getBytesDownloaded() {
|
||||
|
@ -1381,6 +1400,21 @@ public class IndexFetcher {
|
|||
* The main method which downloads file
|
||||
*/
|
||||
public void fetchFile() throws Exception {
|
||||
bytesDownloaded = 0;
|
||||
try {
|
||||
fetch();
|
||||
} catch(Exception e) {
|
||||
if (!aborted) {
|
||||
SolrException.log(IndexFetcher.LOG, "Error fetching file, doing one retry...", e);
|
||||
// one retry
|
||||
fetch();
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void fetch() throws Exception {
|
||||
try {
|
||||
while (true) {
|
||||
final FastInputStream is = getStream();
|
||||
|
@ -1569,7 +1603,7 @@ public class IndexFetcher {
|
|||
return new FastInputStream(is);
|
||||
} catch (Exception e) {
|
||||
//close stream on error
|
||||
IOUtils.closeQuietly(is);
|
||||
org.apache.commons.io.IOUtils.closeQuietly(is);
|
||||
throw new IOException("Could not download file '" + fileName + "'", e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,11 +41,17 @@ public abstract class SolrCoreState {
|
|||
|
||||
protected boolean closed = false;
|
||||
private final Object updateLock = new Object();
|
||||
private final Object reloadLock = new Object();
|
||||
|
||||
public Object getUpdateLock() {
|
||||
return updateLock;
|
||||
}
|
||||
|
||||
public Object getReloadLock() {
|
||||
return reloadLock;
|
||||
}
|
||||
|
||||
|
||||
private int solrCoreStateRefCnt = 1;
|
||||
|
||||
public void increfSolrCoreState() {
|
||||
|
|
|
@ -34,7 +34,7 @@
|
|||
|
||||
<requestHandler name="/replication" class="solr.ReplicationHandler">
|
||||
<lst name="defaults">
|
||||
<str name="maxWriteMBPerSec">0.1</str>
|
||||
<str name="maxWriteMBPerSec">0.05</str>
|
||||
</lst>
|
||||
</requestHandler>
|
||||
|
||||
|
|
|
@ -38,6 +38,8 @@ public class CleanupOldIndexTest extends SolrCloudTestCase {
|
|||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
// we restart jetty and expect to find on disk data - need a local fs directory
|
||||
useFactory(null);
|
||||
configureCluster(2)
|
||||
.addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
|
||||
.configure();
|
||||
|
|
|
@ -165,7 +165,7 @@ public class HdfsDirectoryFactoryTest extends SolrTestCaseJ4 {
|
|||
hdfs.mkdirs(oldIndexDirPath);
|
||||
assertTrue(hdfs.isDirectory(oldIndexDirPath));
|
||||
|
||||
hdfsFactory.cleanupOldIndexDirectories(dataHomePath.toString(), currentIndexDirPath.toString());
|
||||
hdfsFactory.cleanupOldIndexDirectories(dataHomePath.toString(), currentIndexDirPath.toString(), false);
|
||||
|
||||
assertTrue(hdfs.isDirectory(currentIndexDirPath));
|
||||
assertTrue(!hdfs.isDirectory(oldIndexDirPath));
|
||||
|
|
|
@ -125,6 +125,8 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
|
|||
slave.setUp();
|
||||
slaveJetty = createJetty(slave);
|
||||
slaveClient = createNewSolrClient(slaveJetty.getLocalPort());
|
||||
|
||||
System.setProperty("solr.indexfetcher.sotimeout2", "45000");
|
||||
}
|
||||
|
||||
public void clearIndexWithReplication() throws Exception {
|
||||
|
@ -147,6 +149,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
|
|||
masterClient.close();
|
||||
slaveClient.close();
|
||||
masterClient = slaveClient = null;
|
||||
System.clearProperty("solr.indexfetcher.sotimeout");
|
||||
}
|
||||
|
||||
private static JettySolrRunner createJetty(SolrInstance instance) throws Exception {
|
||||
|
@ -165,7 +168,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
|
|||
final String baseUrl = buildUrl(port) + "/" + DEFAULT_TEST_CORENAME;
|
||||
HttpSolrClient client = getHttpSolrClient(baseUrl);
|
||||
client.setConnectionTimeout(15000);
|
||||
client.setSoTimeout(60000);
|
||||
client.setSoTimeout(90000);
|
||||
return client;
|
||||
}
|
||||
catch (Exception ex) {
|
||||
|
@ -292,6 +295,16 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
|
|||
|
||||
@Test
|
||||
public void doTestDetails() throws Exception {
|
||||
slaveJetty.stop();
|
||||
|
||||
slave.copyConfigFile(CONF_DIR + "solrconfig-slave.xml", "solrconfig.xml");
|
||||
slaveJetty = createJetty(slave);
|
||||
|
||||
slaveClient.close();
|
||||
masterClient.close();
|
||||
masterClient = createNewSolrClient(masterJetty.getLocalPort());
|
||||
slaveClient = createNewSolrClient(slaveJetty.getLocalPort());
|
||||
|
||||
clearIndexWithReplication();
|
||||
{
|
||||
NamedList<Object> details = getDetails(masterClient);
|
||||
|
@ -307,22 +320,34 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
|
|||
// check details on the slave a couple of times before & after fetching
|
||||
for (int i = 0; i < 3; i++) {
|
||||
NamedList<Object> details = getDetails(slaveClient);
|
||||
List replicatedAtCount = (List) ((NamedList) details.get("slave")).get("indexReplicatedAtList");
|
||||
assertNotNull(i + ": " + details);
|
||||
assertNotNull(i + ": " + details.toString(), details.get("slave"));
|
||||
|
||||
if (i > 0) {
|
||||
assertEquals(i, replicatedAtCount.size());
|
||||
rQuery(i, "*:*", slaveClient);
|
||||
List replicatedAtCount = (List) ((NamedList) details.get("slave")).get("indexReplicatedAtList");
|
||||
int tries = 0;
|
||||
while ((replicatedAtCount == null || replicatedAtCount.size() < i) && tries++ < 5) {
|
||||
Thread.currentThread().sleep(1000);
|
||||
details = getDetails(slaveClient);
|
||||
replicatedAtCount = (List) ((NamedList) details.get("slave")).get("indexReplicatedAtList");
|
||||
}
|
||||
|
||||
assertNotNull("Expected to see that the slave has replicated" + i + ": " + details.toString(), replicatedAtCount);
|
||||
|
||||
// we can have more replications than we added docs because a replication can legally fail and try
|
||||
// again (sometimes we cannot merge into a live index and have to try again)
|
||||
assertTrue("i:" + i + " replicationCount:" + replicatedAtCount.size(), replicatedAtCount.size() >= i);
|
||||
}
|
||||
|
||||
assertEquals("slave isMaster?",
|
||||
"false", details.get("isMaster"));
|
||||
assertEquals("slave isSlave?",
|
||||
"true", details.get("isSlave"));
|
||||
assertNotNull("slave has slave section",
|
||||
details.get("slave"));
|
||||
assertEquals(i + ": " + "slave isMaster?", "false", details.get("isMaster"));
|
||||
assertEquals(i + ": " + "slave isSlave?", "true", details.get("isSlave"));
|
||||
assertNotNull(i + ": " + "slave has slave section", details.get("slave"));
|
||||
// SOLR-2677: assert not false negatives
|
||||
Object timesFailed = ((NamedList)details.get("slave")).get(IndexFetcher.TIMES_FAILED);
|
||||
// SOLR-7134: we can have a fail because some mock index files have no checksum, will
|
||||
// always be downloaded, and may not be able to be moved into the existing index
|
||||
assertTrue("slave has fetch error count: " + (String)timesFailed, timesFailed == null || ((String) timesFailed).equals("1"));
|
||||
assertTrue(i + ": " + "slave has fetch error count: " + (String)timesFailed, timesFailed == null || ((String) timesFailed).equals("1"));
|
||||
|
||||
if (3 != i) {
|
||||
// index & fetch
|
||||
|
@ -544,7 +569,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
|
|||
assertTrue(slaveXsl.exists());
|
||||
|
||||
checkForSingleIndex(masterJetty);
|
||||
checkForSingleIndex(slaveJetty);
|
||||
checkForSingleIndex(slaveJetty, true);
|
||||
|
||||
}
|
||||
|
||||
|
@ -907,6 +932,10 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
|
|||
}
|
||||
|
||||
private void checkForSingleIndex(JettySolrRunner jetty) {
|
||||
checkForSingleIndex(jetty, false);
|
||||
}
|
||||
|
||||
private void checkForSingleIndex(JettySolrRunner jetty, boolean afterReload) {
|
||||
CoreContainer cores = jetty.getCoreContainer();
|
||||
Collection<SolrCore> theCores = cores.getCores();
|
||||
for (SolrCore core : theCores) {
|
||||
|
@ -914,13 +943,27 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
|
|||
CachingDirectoryFactory dirFactory = getCachingDirectoryFactory(core);
|
||||
synchronized (dirFactory) {
|
||||
Set<String> livePaths = dirFactory.getLivePaths();
|
||||
// one for data, one for hte index under data and one for the snapshot metadata.
|
||||
assertEquals(livePaths.toString(), 3, livePaths.size());
|
||||
// one for data, one for the index under data and one for the snapshot metadata.
|
||||
// we also allow one extra index dir - it may not be removed until the core is closed
|
||||
if (afterReload) {
|
||||
assertTrue(livePaths.toString() + ":" + livePaths.size(), 3 == livePaths.size() || 4 == livePaths.size());
|
||||
} else {
|
||||
assertTrue(livePaths.toString() + ":" + livePaths.size(), 3 == livePaths.size());
|
||||
}
|
||||
|
||||
// :TODO: assert that one of the paths is a subpath of hte other
|
||||
}
|
||||
if (dirFactory instanceof StandardDirectoryFactory) {
|
||||
System.out.println(Arrays.asList(new File(ddir).list()));
|
||||
assertEquals(Arrays.asList(new File(ddir).list()).toString(), 1, indexDirCount(ddir));
|
||||
// we also allow one extra index dir - it may not be removed until the core is closed
|
||||
int cnt = indexDirCount(ddir);
|
||||
// if after reload, there may be 2 index dirs while the reloaded SolrCore closes.
|
||||
if (afterReload) {
|
||||
assertTrue("found:" + cnt + Arrays.asList(new File(ddir).list()).toString(), 1 == cnt || 2 == cnt);
|
||||
} else {
|
||||
assertTrue("found:" + cnt + Arrays.asList(new File(ddir).list()).toString(), 1 == cnt);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1337,17 +1380,8 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
|
|||
SolrDocumentList slaveQueryResult2 = (SolrDocumentList) slaveQueryRsp2.get("response");
|
||||
assertEquals(1, slaveQueryResult2.getNumFound());
|
||||
|
||||
index(slaveClient, "id", "2001", "name", "name = " + 2001, "newname", "n2001");
|
||||
slaveClient.commit();
|
||||
|
||||
slaveQueryRsp = rQuery(1, "id:2001", slaveClient);
|
||||
final SolrDocumentList sdl = (SolrDocumentList) slaveQueryRsp.get("response");
|
||||
assertEquals(1, sdl.getNumFound());
|
||||
final SolrDocument d = sdl.get(0);
|
||||
assertEquals("n2001", (String) d.getFieldValue("newname"));
|
||||
|
||||
checkForSingleIndex(masterJetty);
|
||||
checkForSingleIndex(slaveJetty);
|
||||
checkForSingleIndex(slaveJetty, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.solr.util;
|
|||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.TestRuleLimitSysouts.Limit;
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.common.util.ObjectReleaseTracker;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -29,12 +30,12 @@ public class TestObjectReleaseTracker extends LuceneTestCase {
|
|||
public void testObjectReleaseTracker() {
|
||||
ObjectReleaseTracker.track(new Object());
|
||||
ObjectReleaseTracker.release(new Object());
|
||||
assertNotNull(ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty(1));
|
||||
assertNull(ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty(1));
|
||||
assertNotNull(SolrTestCaseJ4.clearObjectTrackerAndCheckEmpty(1));
|
||||
assertNull(SolrTestCaseJ4.clearObjectTrackerAndCheckEmpty(1));
|
||||
Object obj = new Object();
|
||||
ObjectReleaseTracker.track(obj);
|
||||
ObjectReleaseTracker.release(obj);
|
||||
assertNull(ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty(1));
|
||||
assertNull(SolrTestCaseJ4.clearObjectTrackerAndCheckEmpty(1));
|
||||
|
||||
Object obj1 = new Object();
|
||||
ObjectReleaseTracker.track(obj1);
|
||||
|
@ -46,7 +47,7 @@ public class TestObjectReleaseTracker extends LuceneTestCase {
|
|||
ObjectReleaseTracker.release(obj1);
|
||||
ObjectReleaseTracker.release(obj2);
|
||||
ObjectReleaseTracker.release(obj3);
|
||||
assertNull(ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty(1));
|
||||
assertNull(SolrTestCaseJ4.clearObjectTrackerAndCheckEmpty(1));
|
||||
|
||||
ObjectReleaseTracker.track(obj1);
|
||||
ObjectReleaseTracker.track(obj2);
|
||||
|
@ -55,7 +56,7 @@ public class TestObjectReleaseTracker extends LuceneTestCase {
|
|||
ObjectReleaseTracker.release(obj1);
|
||||
ObjectReleaseTracker.release(obj2);
|
||||
// ObjectReleaseTracker.release(obj3);
|
||||
assertNotNull(ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty(1));
|
||||
assertNull(ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty(1));
|
||||
assertNotNull(SolrTestCaseJ4.clearObjectTrackerAndCheckEmpty(1));
|
||||
assertNull(SolrTestCaseJ4.clearObjectTrackerAndCheckEmpty(1));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,6 @@ import java.util.Map.Entry;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -54,27 +53,6 @@ public class ObjectReleaseTracker {
|
|||
OBJECTS.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return null if ok else error message
|
||||
*/
|
||||
public static String clearObjectTrackerAndCheckEmpty(int waitSeconds) {
|
||||
int retries = 0;
|
||||
String result;
|
||||
do {
|
||||
result = checkEmpty();
|
||||
if (result == null)
|
||||
break;
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
} catch (InterruptedException e) { break; }
|
||||
}
|
||||
while (retries++ < waitSeconds);
|
||||
|
||||
OBJECTS.clear();
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return null if ok else error message
|
||||
*/
|
||||
|
|
|
@ -51,8 +51,12 @@ import java.util.List;
|
|||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.RandomizedContext;
|
||||
import com.carrotsearch.randomizedtesting.TraceFormatting;
|
||||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
||||
import com.carrotsearch.randomizedtesting.rules.SystemPropertiesRestoreRule;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
@ -154,6 +158,14 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
|
|||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static final List<String> DEFAULT_STACK_FILTERS = Arrays.asList(new String [] {
|
||||
"org.junit.",
|
||||
"junit.framework.",
|
||||
"sun.",
|
||||
"java.lang.reflect.",
|
||||
"com.carrotsearch.randomizedtesting.",
|
||||
});
|
||||
|
||||
public static final String DEFAULT_TEST_COLLECTION_NAME = "collection1";
|
||||
public static final String DEFAULT_TEST_CORENAME = DEFAULT_TEST_COLLECTION_NAME;
|
||||
protected static final String CORE_PROPERTIES_FILENAME = "core.properties";
|
||||
|
@ -285,7 +297,7 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
|
|||
// if the tests passed, make sure everything was closed / released
|
||||
if (!RandomizedContext.current().getTargetClass().isAnnotationPresent(SuppressObjectReleaseTracker.class)) {
|
||||
endTrackingSearchers(120, false);
|
||||
String orr = ObjectReleaseTracker.clearObjectTrackerAndCheckEmpty(30);
|
||||
String orr = clearObjectTrackerAndCheckEmpty(120);
|
||||
assertNull(orr, orr);
|
||||
} else {
|
||||
endTrackingSearchers(15, false);
|
||||
|
@ -323,6 +335,41 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
|
|||
LogLevel.Configurer.restoreLogLevels(savedClassLogLevels);
|
||||
savedClassLogLevels.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return null if ok else error message
|
||||
*/
|
||||
public static String clearObjectTrackerAndCheckEmpty(int waitSeconds) {
|
||||
int retries = 0;
|
||||
String result;
|
||||
do {
|
||||
result = ObjectReleaseTracker.checkEmpty();
|
||||
if (result == null)
|
||||
break;
|
||||
try {
|
||||
if (retries % 10 == 0) {
|
||||
log.info("Waiting for all tracked resources to be released");
|
||||
if (retries > 10) {
|
||||
TraceFormatting tf = new TraceFormatting(DEFAULT_STACK_FILTERS);
|
||||
Map<Thread,StackTraceElement[]> stacksMap = Thread.getAllStackTraces();
|
||||
Set<Entry<Thread,StackTraceElement[]>> entries = stacksMap.entrySet();
|
||||
for (Entry<Thread,StackTraceElement[]> entry : entries) {
|
||||
String stack = tf.formatStackTrace(entry.getValue());
|
||||
System.err.println(entry.getKey().getName() + ":\n" + stack);
|
||||
}
|
||||
}
|
||||
}
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
} catch (InterruptedException e) { break; }
|
||||
}
|
||||
while (retries++ < waitSeconds);
|
||||
|
||||
|
||||
log.info("------------------------------------------------------- Done waiting for tracked resources to be released");
|
||||
ObjectReleaseTracker.clear();
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private static Map<String, String> savedClassLogLevels = new HashMap<>();
|
||||
|
||||
|
@ -541,6 +588,18 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
|
|||
if (retries++ > waitSeconds) {
|
||||
break;
|
||||
}
|
||||
if (retries % 10 == 0) {
|
||||
log.info("Waiting for all SolrIndexSearchers to be released at end of test");
|
||||
if (retries > 10) {
|
||||
TraceFormatting tf = new TraceFormatting();
|
||||
Map<Thread,StackTraceElement[]> stacksMap = Thread.getAllStackTraces();
|
||||
Set<Entry<Thread,StackTraceElement[]>> entries = stacksMap.entrySet();
|
||||
for (Entry<Thread,StackTraceElement[]> entry : entries) {
|
||||
String stack = tf.formatStackTrace(entry.getValue());
|
||||
System.err.println(entry.getKey().getName() + ":\n" + stack);
|
||||
}
|
||||
}
|
||||
}
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {}
|
||||
|
@ -548,6 +607,8 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
|
|||
endNumCloses = SolrIndexSearcher.numCloses.get();
|
||||
}
|
||||
|
||||
log.info("------------------------------------------------------- Done waiting for all SolrIndexSearchers to be released");
|
||||
|
||||
SolrIndexSearcher.numOpens.getAndSet(0);
|
||||
SolrIndexSearcher.numCloses.getAndSet(0);
|
||||
|
||||
|
|
Loading…
Reference in New Issue