mirror of https://github.com/apache/lucene.git
SOLR-12999: Index replication could delete segments before downloading segments from master if there is not enough disk space
This commit is contained in:
parent
430a810006
commit
b061947e91
|
@ -235,10 +235,16 @@ public final class SolrCore implements SolrInfoBean, SolrMetricProducer, Closeab
|
||||||
private Set<String> metricNames = ConcurrentHashMap.newKeySet();
|
private Set<String> metricNames = ConcurrentHashMap.newKeySet();
|
||||||
private String metricTag = Integer.toHexString(hashCode());
|
private String metricTag = Integer.toHexString(hashCode());
|
||||||
|
|
||||||
|
public boolean searchEnabled = true;
|
||||||
|
public boolean indexEnabled = true;
|
||||||
|
|
||||||
public Set<String> getMetricNames() {
|
public Set<String> getMetricNames() {
|
||||||
return metricNames;
|
return metricNames;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isSearchEnabled(){
|
||||||
|
return searchEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
public Date getStartTimeStamp() { return startTime; }
|
public Date getStartTimeStamp() { return startTime; }
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.lang.invoke.MethodHandles;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.FileChannel;
|
import java.nio.channels.FileChannel;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.nio.file.FileStore;
|
||||||
import java.nio.file.FileSystems;
|
import java.nio.file.FileSystems;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.NoSuchFileException;
|
import java.nio.file.NoSuchFileException;
|
||||||
|
@ -50,6 +51,8 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.BooleanSupplier;
|
||||||
|
import java.util.function.Function;
|
||||||
import java.util.zip.Adler32;
|
import java.util.zip.Adler32;
|
||||||
import java.util.zip.Checksum;
|
import java.util.zip.Checksum;
|
||||||
import java.util.zip.InflaterInputStream;
|
import java.util.zip.InflaterInputStream;
|
||||||
|
@ -168,6 +171,8 @@ public class IndexFetcher {
|
||||||
|
|
||||||
private boolean skipCommitOnMasterVersionZero = true;
|
private boolean skipCommitOnMasterVersionZero = true;
|
||||||
|
|
||||||
|
private boolean clearLocalIndexFirst = false;
|
||||||
|
|
||||||
private static final String INTERRUPT_RESPONSE_MESSAGE = "Interrupted while waiting for modify lock";
|
private static final String INTERRUPT_RESPONSE_MESSAGE = "Interrupted while waiting for modify lock";
|
||||||
|
|
||||||
public static class IndexFetchResult {
|
public static class IndexFetchResult {
|
||||||
|
@ -357,6 +362,7 @@ public class IndexFetcher {
|
||||||
*/
|
*/
|
||||||
IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreReload) throws IOException, InterruptedException {
|
IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreReload) throws IOException, InterruptedException {
|
||||||
|
|
||||||
|
this.clearLocalIndexFirst = false;
|
||||||
boolean cleanupDone = false;
|
boolean cleanupDone = false;
|
||||||
boolean successfulInstall = false;
|
boolean successfulInstall = false;
|
||||||
markReplicationStart();
|
markReplicationStart();
|
||||||
|
@ -596,7 +602,9 @@ public class IndexFetcher {
|
||||||
// let the system know we are changing dir's and the old one
|
// let the system know we are changing dir's and the old one
|
||||||
// may be closed
|
// may be closed
|
||||||
if (indexDir != null) {
|
if (indexDir != null) {
|
||||||
solrCore.getDirectoryFactory().doneWithDirectory(indexDir);
|
if (!this.clearLocalIndexFirst) {//it was closed earlier
|
||||||
|
solrCore.getDirectoryFactory().doneWithDirectory(indexDir);
|
||||||
|
}
|
||||||
// Cleanup all index files not associated with any *named* snapshot.
|
// Cleanup all index files not associated with any *named* snapshot.
|
||||||
solrCore.deleteNonSnapshotIndexFiles(indexDirPath);
|
solrCore.deleteNonSnapshotIndexFiles(indexDirPath);
|
||||||
}
|
}
|
||||||
|
@ -625,6 +633,8 @@ public class IndexFetcher {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
solrCore.searchEnabled = true;
|
||||||
|
solrCore.indexEnabled = true;
|
||||||
if (!isFullCopyNeeded) {
|
if (!isFullCopyNeeded) {
|
||||||
solrCore.getUpdateHandler().getSolrCoreState().openIndexWriter(solrCore);
|
solrCore.getUpdateHandler().getSolrCoreState().openIndexWriter(solrCore);
|
||||||
}
|
}
|
||||||
|
@ -803,6 +813,9 @@ public class IndexFetcher {
|
||||||
props.setProperty(INDEX_REPLICATED_AT, String.valueOf(replicationTime));
|
props.setProperty(INDEX_REPLICATED_AT, String.valueOf(replicationTime));
|
||||||
props.setProperty(PREVIOUS_CYCLE_TIME_TAKEN, String.valueOf(replicationTimeTaken));
|
props.setProperty(PREVIOUS_CYCLE_TIME_TAKEN, String.valueOf(replicationTimeTaken));
|
||||||
props.setProperty(TIMES_INDEX_REPLICATED, String.valueOf(indexCount));
|
props.setProperty(TIMES_INDEX_REPLICATED, String.valueOf(indexCount));
|
||||||
|
if (clearLocalIndexFirst) {
|
||||||
|
props.setProperty(CLEARED_LOCAL_IDX, "true");
|
||||||
|
}
|
||||||
if (modifiedConfFiles != null && !modifiedConfFiles.isEmpty()) {
|
if (modifiedConfFiles != null && !modifiedConfFiles.isEmpty()) {
|
||||||
props.setProperty(CONF_FILES_REPLICATED, confFiles.toString());
|
props.setProperty(CONF_FILES_REPLICATED, confFiles.toString());
|
||||||
props.setProperty(CONF_FILES_REPLICATED_AT, String.valueOf(replicationTime));
|
props.setProperty(CONF_FILES_REPLICATED_AT, String.valueOf(replicationTime));
|
||||||
|
@ -1007,6 +1020,19 @@ public class IndexFetcher {
|
||||||
&& (tmpIndexDir instanceof FSDirectory ||
|
&& (tmpIndexDir instanceof FSDirectory ||
|
||||||
(tmpIndexDir instanceof FilterDirectory && FilterDirectory.unwrap(tmpIndexDir) instanceof FSDirectory));
|
(tmpIndexDir instanceof FilterDirectory && FilterDirectory.unwrap(tmpIndexDir) instanceof FSDirectory));
|
||||||
|
|
||||||
|
|
||||||
|
long totalSpaceRequired = 0;
|
||||||
|
for (Map<String, Object> file : filesToDownload) {
|
||||||
|
long size = (Long) file.get(SIZE);
|
||||||
|
totalSpaceRequired += size;
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("tmpIndexDir_type : " + tmpIndexDir.getClass() + " , " + FilterDirectory.unwrap(tmpIndexDir));
|
||||||
|
long usableSpace = usableDiskSpaceProvider.apply(tmpIndexDirPath);
|
||||||
|
if (getApproxTotalSpaceReqd(totalSpaceRequired) > usableSpace) {
|
||||||
|
deleteFilesInAdvance(indexDir, indexDirPath, totalSpaceRequired, usableSpace);
|
||||||
|
}
|
||||||
|
|
||||||
for (Map<String,Object> file : filesToDownload) {
|
for (Map<String,Object> file : filesToDownload) {
|
||||||
String filename = (String) file.get(NAME);
|
String filename = (String) file.get(NAME);
|
||||||
long size = (Long) file.get(SIZE);
|
long size = (Long) file.get(SIZE);
|
||||||
|
@ -1039,6 +1065,82 @@ public class IndexFetcher {
|
||||||
return bytesDownloaded;
|
return bytesDownloaded;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//only for testing purposes. do not use this anywhere else
|
||||||
|
//-----------START----------------------
|
||||||
|
static BooleanSupplier testWait = () -> true;
|
||||||
|
static Function<String, Long> usableDiskSpaceProvider = dir -> getUsableSpace(dir);
|
||||||
|
//------------ END---------------------
|
||||||
|
|
||||||
|
|
||||||
|
private static Long getUsableSpace(String dir) {
|
||||||
|
try {
|
||||||
|
File file = new File(dir);
|
||||||
|
if (!file.exists()) {
|
||||||
|
file = file.getParentFile();
|
||||||
|
if (!file.exists()) {//this is not a disk directory . so just pretend that there is enough space
|
||||||
|
return Long.MAX_VALUE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
FileStore fileStore = Files.getFileStore(file.toPath());
|
||||||
|
return fileStore.getUsableSpace();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not free disk space", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
private long getApproxTotalSpaceReqd(long totalSpaceRequired) {
|
||||||
|
long approxTotalSpaceReqd = (long) (totalSpaceRequired * 1.05);// add 5% extra for safety
|
||||||
|
approxTotalSpaceReqd += (100 * 1024 * 1024); //we should have an extra of 100MB free after everything is downloaded
|
||||||
|
return approxTotalSpaceReqd;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteFilesInAdvance(Directory indexDir, String indexDirPath, long usableDiskSpace, long totalSpaceRequired) throws IOException {
|
||||||
|
long actualSpaceReqd = totalSpaceRequired;
|
||||||
|
List<String> filesTobeDeleted = new ArrayList<>();
|
||||||
|
long clearedSpace = 0;
|
||||||
|
//go through each file to check if this needs to be deleted
|
||||||
|
for (String f : indexDir.listAll()) {
|
||||||
|
for (Map<String, Object> fileInfo : filesToDownload) {
|
||||||
|
if (f.equals(fileInfo.get(NAME))) {
|
||||||
|
String filename = (String) fileInfo.get(NAME);
|
||||||
|
long size = (Long) fileInfo.get(SIZE);
|
||||||
|
CompareResult compareResult = compareFile(indexDir, filename, size, (Long) fileInfo.get(CHECKSUM));
|
||||||
|
if (!compareResult.equal || filesToAlwaysDownloadIfNoChecksums(f, size, compareResult)) {
|
||||||
|
filesTobeDeleted.add(f);
|
||||||
|
clearedSpace += size;
|
||||||
|
} else {
|
||||||
|
/*this file will not be downloaded*/
|
||||||
|
actualSpaceReqd -= size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (usableDiskSpace > getApproxTotalSpaceReqd(actualSpaceReqd)) {
|
||||||
|
// after considering the files actually available locally we really don't need to do any delete
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
log.info("This disk does not have enough space to download the index from leader/master. So cleaning up the local index. " +
|
||||||
|
" This may lead to loss of data/or node if index replication fails in between");
|
||||||
|
//now we should disable searchers and index writers because this core will not have all the required files
|
||||||
|
this.clearLocalIndexFirst = true;
|
||||||
|
this.solrCore.searchEnabled = false;
|
||||||
|
this.solrCore.indexEnabled = false;
|
||||||
|
solrCore.getDirectoryFactory().doneWithDirectory(indexDir);
|
||||||
|
solrCore.deleteNonSnapshotIndexFiles(indexDirPath);
|
||||||
|
this.solrCore.closeSearcher();
|
||||||
|
assert testWait.getAsBoolean();
|
||||||
|
solrCore.getUpdateHandler().getSolrCoreState().closeIndexWriter(this.solrCore, false);
|
||||||
|
for (String f : filesTobeDeleted) {
|
||||||
|
try {
|
||||||
|
indexDir.deleteFile(f);
|
||||||
|
} catch (FileNotFoundException | NoSuchFileException e) {
|
||||||
|
//no problem , it was deleted by someone else
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static boolean filesToAlwaysDownloadIfNoChecksums(String filename,
|
static boolean filesToAlwaysDownloadIfNoChecksums(String filename,
|
||||||
long size, CompareResult compareResult) {
|
long size, CompareResult compareResult) {
|
||||||
// without checksums to compare, we always download .si, .liv, segments_N,
|
// without checksums to compare, we always download .si, .liv, segments_N,
|
||||||
|
@ -1879,6 +1981,8 @@ public class IndexFetcher {
|
||||||
|
|
||||||
static final String TIMES_INDEX_REPLICATED = "timesIndexReplicated";
|
static final String TIMES_INDEX_REPLICATED = "timesIndexReplicated";
|
||||||
|
|
||||||
|
static final String CLEARED_LOCAL_IDX = "clearedLocalIndexFirst";
|
||||||
|
|
||||||
static final String CONF_FILES_REPLICATED = "confFilesReplicated";
|
static final String CONF_FILES_REPLICATED = "confFilesReplicated";
|
||||||
|
|
||||||
static final String CONF_FILES_REPLICATED_AT = "confFilesReplicatedAt";
|
static final String CONF_FILES_REPLICATED_AT = "confFilesReplicatedAt";
|
||||||
|
|
|
@ -616,6 +616,14 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
||||||
private void getFileList(SolrParams solrParams, SolrQueryResponse rsp) {
|
private void getFileList(SolrParams solrParams, SolrQueryResponse rsp) {
|
||||||
String v = solrParams.required().get(GENERATION);
|
String v = solrParams.required().get(GENERATION);
|
||||||
long gen = Long.parseLong(v);
|
long gen = Long.parseLong(v);
|
||||||
|
if (gen == -1) {
|
||||||
|
IndexCommit commitPoint = core.getDeletionPolicy().getLatestCommit();
|
||||||
|
if(commitPoint == null) {
|
||||||
|
rsp.add(CMD_GET_FILE_LIST, Collections.EMPTY_LIST);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
gen = commitPoint.getGeneration();
|
||||||
|
}
|
||||||
IndexCommit commit = core.getDeletionPolicy().getCommitPoint(gen);
|
IndexCommit commit = core.getDeletionPolicy().getCommitPoint(gen);
|
||||||
|
|
||||||
if (commit == null) {
|
if (commit == null) {
|
||||||
|
@ -974,6 +982,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
||||||
addVal(slave, IndexFetcher.TIMES_FAILED, props, Integer.class);
|
addVal(slave, IndexFetcher.TIMES_FAILED, props, Integer.class);
|
||||||
addVal(slave, IndexFetcher.REPLICATION_FAILED_AT, props, Date.class);
|
addVal(slave, IndexFetcher.REPLICATION_FAILED_AT, props, Date.class);
|
||||||
addVal(slave, IndexFetcher.PREVIOUS_CYCLE_TIME_TAKEN, props, Long.class);
|
addVal(slave, IndexFetcher.PREVIOUS_CYCLE_TIME_TAKEN, props, Long.class);
|
||||||
|
addVal(slave, IndexFetcher.CLEARED_LOCAL_IDX, props, Long.class);
|
||||||
|
|
||||||
slave.add("currentDate", new Date().toString());
|
slave.add("currentDate", new Date().toString());
|
||||||
slave.add("isPollingDisabled", String.valueOf(isPollingDisabled()));
|
slave.add("isPollingDisabled", String.valueOf(isPollingDisabled()));
|
||||||
|
|
|
@ -16,20 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.solr.request;
|
package org.apache.solr.request;
|
||||||
|
|
||||||
import org.apache.solr.api.ApiBag;
|
|
||||||
import org.apache.solr.common.SolrException;
|
|
||||||
import org.apache.solr.common.util.ValidatingJsonMap;
|
|
||||||
import org.apache.solr.common.util.SuppressForbidden;
|
|
||||||
import org.apache.solr.search.SolrIndexSearcher;
|
|
||||||
import org.apache.solr.common.util.CommandOperation;
|
|
||||||
import org.apache.solr.common.util.JsonSchemaValidator;
|
|
||||||
import org.apache.solr.util.RTimerTree;
|
|
||||||
import org.apache.solr.util.RefCounted;
|
|
||||||
import org.apache.solr.schema.IndexSchema;
|
|
||||||
import org.apache.solr.common.params.SolrParams;
|
|
||||||
import org.apache.solr.common.util.ContentStream;
|
|
||||||
import org.apache.solr.core.SolrCore;
|
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.security.Principal;
|
import java.security.Principal;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -37,6 +23,20 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.solr.api.ApiBag;
|
||||||
|
import org.apache.solr.common.SolrException;
|
||||||
|
import org.apache.solr.common.params.SolrParams;
|
||||||
|
import org.apache.solr.common.util.CommandOperation;
|
||||||
|
import org.apache.solr.common.util.ContentStream;
|
||||||
|
import org.apache.solr.common.util.JsonSchemaValidator;
|
||||||
|
import org.apache.solr.common.util.SuppressForbidden;
|
||||||
|
import org.apache.solr.common.util.ValidatingJsonMap;
|
||||||
|
import org.apache.solr.core.SolrCore;
|
||||||
|
import org.apache.solr.schema.IndexSchema;
|
||||||
|
import org.apache.solr.search.SolrIndexSearcher;
|
||||||
|
import org.apache.solr.util.RTimerTree;
|
||||||
|
import org.apache.solr.util.RefCounted;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base implementation of <code>SolrQueryRequest</code> that provides some
|
* Base implementation of <code>SolrQueryRequest</code> that provides some
|
||||||
|
@ -119,6 +119,10 @@ public abstract class SolrQueryRequestBase implements SolrQueryRequest, Closeabl
|
||||||
// should the core populate one in a factory method to create requests?
|
// should the core populate one in a factory method to create requests?
|
||||||
// or there could be a setSearcher() method that Solr calls
|
// or there could be a setSearcher() method that Solr calls
|
||||||
|
|
||||||
|
if(!core.isSearchEnabled()){
|
||||||
|
throw new SolrException( SolrException.ErrorCode.FORBIDDEN,"Search is temporarily disabled");
|
||||||
|
}
|
||||||
|
|
||||||
if (searcherHolder==null) {
|
if (searcherHolder==null) {
|
||||||
searcherHolder = core.getSearcher();
|
searcherHolder = core.getSearcher();
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,8 +34,8 @@ import org.apache.lucene.index.MergePolicy;
|
||||||
import org.apache.lucene.search.Sort;
|
import org.apache.lucene.search.Sort;
|
||||||
import org.apache.solr.cloud.ActionThrottle;
|
import org.apache.solr.cloud.ActionThrottle;
|
||||||
import org.apache.solr.cloud.RecoveryStrategy;
|
import org.apache.solr.cloud.RecoveryStrategy;
|
||||||
import org.apache.solr.common.SolrException.ErrorCode;
|
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
|
import org.apache.solr.common.SolrException.ErrorCode;
|
||||||
import org.apache.solr.core.CoreContainer;
|
import org.apache.solr.core.CoreContainer;
|
||||||
import org.apache.solr.core.CoreDescriptor;
|
import org.apache.solr.core.CoreDescriptor;
|
||||||
import org.apache.solr.core.DirectoryFactory;
|
import org.apache.solr.core.DirectoryFactory;
|
||||||
|
@ -116,7 +116,9 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
|
||||||
@Override
|
@Override
|
||||||
public RefCounted<IndexWriter> getIndexWriter(SolrCore core)
|
public RefCounted<IndexWriter> getIndexWriter(SolrCore core)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
if (core != null && !core.indexEnabled) {
|
||||||
|
throw new SolrException(SolrException.ErrorCode.FORBIDDEN, "Indexing is temporarily disabled");
|
||||||
|
}
|
||||||
boolean succeeded = false;
|
boolean succeeded = false;
|
||||||
lock(iwLock.readLock());
|
lock(iwLock.readLock());
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -157,7 +157,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
|
||||||
System.clearProperty("solr.indexfetcher.sotimeout");
|
System.clearProperty("solr.indexfetcher.sotimeout");
|
||||||
}
|
}
|
||||||
|
|
||||||
private static JettySolrRunner createAndStartJetty(SolrInstance instance) throws Exception {
|
static JettySolrRunner createAndStartJetty(SolrInstance instance) throws Exception {
|
||||||
FileUtils.copyFile(new File(SolrTestCaseJ4.TEST_HOME(), "solr.xml"), new File(instance.getHomeDir(), "solr.xml"));
|
FileUtils.copyFile(new File(SolrTestCaseJ4.TEST_HOME(), "solr.xml"), new File(instance.getHomeDir(), "solr.xml"));
|
||||||
Properties nodeProperties = new Properties();
|
Properties nodeProperties = new Properties();
|
||||||
nodeProperties.setProperty("solr.data.dir", instance.getDataDir());
|
nodeProperties.setProperty("solr.data.dir", instance.getDataDir());
|
||||||
|
@ -167,7 +167,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
|
||||||
return jetty;
|
return jetty;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static SolrClient createNewSolrClient(int port) {
|
static SolrClient createNewSolrClient(int port) {
|
||||||
try {
|
try {
|
||||||
// setup the client...
|
// setup the client...
|
||||||
final String baseUrl = buildUrl(port) + "/" + DEFAULT_TEST_CORENAME;
|
final String baseUrl = buildUrl(port) + "/" + DEFAULT_TEST_CORENAME;
|
||||||
|
@ -179,7 +179,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int index(SolrClient s, Object... fields) throws Exception {
|
static int index(SolrClient s, Object... fields) throws Exception {
|
||||||
SolrInputDocument doc = new SolrInputDocument();
|
SolrInputDocument doc = new SolrInputDocument();
|
||||||
for (int i = 0; i < fields.length; i += 2) {
|
for (int i = 0; i < fields.length; i += 2) {
|
||||||
doc.addField((String) (fields[i]), fields[i + 1]);
|
doc.addField((String) (fields[i]), fields[i + 1]);
|
||||||
|
@ -473,7 +473,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
|
||||||
|
|
||||||
//Simple function to wrap the invocation of replication commands on the various
|
//Simple function to wrap the invocation of replication commands on the various
|
||||||
//jetty servers.
|
//jetty servers.
|
||||||
private void invokeReplicationCommand(int pJettyPort, String pCommand) throws IOException
|
static void invokeReplicationCommand(int pJettyPort, String pCommand) throws IOException
|
||||||
{
|
{
|
||||||
String masterUrl = buildUrl(pJettyPort) + "/" + DEFAULT_TEST_CORENAME + ReplicationHandler.PATH+"?command=" + pCommand;
|
String masterUrl = buildUrl(pJettyPort) + "/" + DEFAULT_TEST_CORENAME + ReplicationHandler.PATH+"?command=" + pCommand;
|
||||||
URL u = new URL(masterUrl);
|
URL u = new URL(masterUrl);
|
||||||
|
@ -1486,7 +1486,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
|
||||||
q.add("qt", "/replication")
|
q.add("qt", "/replication")
|
||||||
.add("wt", "json")
|
.add("wt", "json")
|
||||||
.add("command", "filelist")
|
.add("command", "filelist")
|
||||||
.add("generation", "-1"); // A 'generation' value not matching any commit point should cause error.
|
.add("generation", "-2"); // A 'generation' value not matching any commit point should cause error.
|
||||||
QueryResponse response = slaveClient.query(q);
|
QueryResponse response = slaveClient.query(q);
|
||||||
NamedList<Object> resp = response.getResponse();
|
NamedList<Object> resp = response.getResponse();
|
||||||
assertNotNull(resp);
|
assertNotNull(resp);
|
||||||
|
|
|
@ -0,0 +1,190 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.handler;
|
||||||
|
|
||||||
|
import java.io.StringWriter;
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.function.BooleanSupplier;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.TestUtil;
|
||||||
|
import org.apache.solr.SolrTestCaseJ4;
|
||||||
|
import org.apache.solr.client.solrj.SolrClient;
|
||||||
|
import org.apache.solr.client.solrj.SolrQuery;
|
||||||
|
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
|
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||||
|
import org.apache.solr.common.util.Utils;
|
||||||
|
import org.apache.solr.util.LogLevel;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.solr.handler.ReplicationHandler.CMD_FETCH_INDEX;
|
||||||
|
import static org.apache.solr.handler.ReplicationHandler.CMD_GET_FILE_LIST;
|
||||||
|
import static org.apache.solr.handler.TestReplicationHandler.createAndStartJetty;
|
||||||
|
import static org.apache.solr.handler.TestReplicationHandler.createNewSolrClient;
|
||||||
|
import static org.apache.solr.handler.TestReplicationHandler.invokeReplicationCommand;
|
||||||
|
|
||||||
|
@LogLevel("org.apache.solr.handler.IndexFetcher=DEBUG")
|
||||||
|
@SolrTestCaseJ4.SuppressSSL
|
||||||
|
public class TestReplicationHandlerDiskOverFlow extends SolrTestCaseJ4 {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
JettySolrRunner masterJetty, slaveJetty;
|
||||||
|
SolrClient masterClient, slaveClient;
|
||||||
|
TestReplicationHandler.SolrInstance master = null, slave = null;
|
||||||
|
|
||||||
|
static String context = "/solr";
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
|
||||||
|
String factory = random().nextInt(100) < 75 ? "solr.NRTCachingDirectoryFactory" : "solr.StandardDirectoryFactory"; // test the default most of the time
|
||||||
|
System.setProperty("solr.directoryFactory", factory);
|
||||||
|
master = new TestReplicationHandler.SolrInstance(createTempDir("solr-instance").toFile(), "master", null);
|
||||||
|
master.setUp();
|
||||||
|
masterJetty = createAndStartJetty(master);
|
||||||
|
masterClient = createNewSolrClient(masterJetty.getLocalPort());
|
||||||
|
|
||||||
|
slave = new TestReplicationHandler.SolrInstance(createTempDir("solr-instance").toFile(), "slave", masterJetty.getLocalPort());
|
||||||
|
slave.setUp();
|
||||||
|
slaveJetty = createAndStartJetty(slave);
|
||||||
|
slaveClient = createNewSolrClient(slaveJetty.getLocalPort());
|
||||||
|
|
||||||
|
System.setProperty("solr.indexfetcher.sotimeout2", "45000");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
super.tearDown();
|
||||||
|
masterJetty.stop();
|
||||||
|
slaveJetty.stop();
|
||||||
|
masterJetty = slaveJetty = null;
|
||||||
|
master = slave = null;
|
||||||
|
masterClient.close();
|
||||||
|
slaveClient.close();
|
||||||
|
masterClient = slaveClient = null;
|
||||||
|
System.clearProperty("solr.indexfetcher.sotimeout");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDiskOverFlow() throws Exception {
|
||||||
|
invokeReplicationCommand(slaveJetty.getLocalPort(), "disablepoll");
|
||||||
|
//index docs
|
||||||
|
System.out.println("MASTER");
|
||||||
|
int docsInMaster = 1000;
|
||||||
|
long szMaster = indexDocs(masterClient, docsInMaster, 0);
|
||||||
|
System.out.println("SLAVE");
|
||||||
|
long szSlave = indexDocs(slaveClient, 1200, 1000);
|
||||||
|
|
||||||
|
|
||||||
|
Function<String, Long> originalDiskSpaceprovider = IndexFetcher.usableDiskSpaceProvider;
|
||||||
|
IndexFetcher.usableDiskSpaceProvider = new Function<String, Long>() {
|
||||||
|
@Override
|
||||||
|
public Long apply(String s) {
|
||||||
|
return szMaster;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
QueryResponse response;
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
AtomicBoolean searchDisabledFound = new AtomicBoolean(false);
|
||||||
|
try {
|
||||||
|
IndexFetcher.testWait = new BooleanSupplier() {
|
||||||
|
@Override
|
||||||
|
public boolean getAsBoolean() {
|
||||||
|
try {
|
||||||
|
latch.await(5, TimeUnit.SECONDS);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
new Thread(() -> {
|
||||||
|
for (int i = 0; i < 20; i++) {
|
||||||
|
try {
|
||||||
|
QueryResponse rsp = slaveClient.query(new SolrQuery()
|
||||||
|
.setQuery("*:*")
|
||||||
|
.setRows(0));
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (e.getMessage().contains("Search is temporarily disabled")) {
|
||||||
|
searchDisabledFound.set(true);
|
||||||
|
}
|
||||||
|
latch.countDown();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
|
|
||||||
|
response = slaveClient.query(new SolrQuery()
|
||||||
|
.add("qt", "/replication")
|
||||||
|
.add("command", CMD_FETCH_INDEX)
|
||||||
|
.add("wait", "true")
|
||||||
|
);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
IndexFetcher.usableDiskSpaceProvider = originalDiskSpaceprovider;
|
||||||
|
}
|
||||||
|
assertTrue(searchDisabledFound.get());
|
||||||
|
assertEquals("OK", response._getStr("status", null));
|
||||||
|
// System.out.println("MASTER INDEX: " + szMaster);
|
||||||
|
// System.out.println("SLAVE INDEX: " + szSlave);
|
||||||
|
|
||||||
|
response = slaveClient.query(new SolrQuery().setQuery("*:*").setRows(0));
|
||||||
|
assertEquals(docsInMaster, response.getResults().getNumFound());
|
||||||
|
|
||||||
|
response = slaveClient.query(new SolrQuery()
|
||||||
|
.add("qt", "/replication")
|
||||||
|
.add("command", ReplicationHandler.CMD_DETAILS)
|
||||||
|
);
|
||||||
|
System.out.println("DETAILS" + Utils.writeJson(response, new StringWriter(), true).toString());
|
||||||
|
assertEquals("true", response._getStr("details/slave/clearedLocalIndexFirst", null));
|
||||||
|
}
|
||||||
|
|
||||||
|
private long indexDocs(SolrClient client, int totalDocs, int start) throws Exception {
|
||||||
|
for (int i = 0; i < totalDocs; i++)
|
||||||
|
TestReplicationHandler.index(client, "id", i + start, "name", TestUtil.randomSimpleString(random(), 1000, 5000));
|
||||||
|
client.commit(true, true);
|
||||||
|
QueryResponse response = client.query(new SolrQuery()
|
||||||
|
.add("qt", "/replication")
|
||||||
|
.add("command", "filelist")
|
||||||
|
.add("generation", "-1"));
|
||||||
|
|
||||||
|
long totalSize = 0;
|
||||||
|
for (Map map : (List<Map>) response.getResponse().get(CMD_GET_FILE_LIST)) {
|
||||||
|
Long sz = (Long) map.get(ReplicationHandler.SIZE);
|
||||||
|
totalSize += sz;
|
||||||
|
}
|
||||||
|
return totalSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue