mirror of https://github.com/apache/lucene.git
SOLR-9236: AutoAddReplicas will append an extra /tlog to the update log location on replica failover.
This commit is contained in:
parent
182f01365f
commit
dc763e383d
|
@ -97,6 +97,9 @@ Bug Fixes
|
||||||
* SOLR-8858: SolrIndexSearcher#doc() completely ignores field filters unless lazy field loading is enabled.
|
* SOLR-8858: SolrIndexSearcher#doc() completely ignores field filters unless lazy field loading is enabled.
|
||||||
(Caleb Rackliffe, David Smiley, shalin)
|
(Caleb Rackliffe, David Smiley, shalin)
|
||||||
|
|
||||||
|
* SOLR-9236: AutoAddReplicas will append an extra /tlog to the update log location on replica failover.
|
||||||
|
(Eungsop Yoo, Mark Miller)
|
||||||
|
|
||||||
Optimizations
|
Optimizations
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
|
|
|
@ -451,7 +451,7 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
|
||||||
// for now, the collections API will use unique names
|
// for now, the collections API will use unique names
|
||||||
createCmd.setCoreName(coreName);
|
createCmd.setCoreName(coreName);
|
||||||
createCmd.setDataDir(dataDir);
|
createCmd.setDataDir(dataDir);
|
||||||
createCmd.setUlogDir(ulogDir);
|
createCmd.setUlogDir(ulogDir.substring(0, ulogDir.length() - "/tlog".length()));
|
||||||
client.request(createCmd);
|
client.request(createCmd);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
SolrException.log(log, "Exception trying to create new replica on " + createUrl, e);
|
SolrException.log(log, "Exception trying to create new replica on " + createUrl, e);
|
||||||
|
|
|
@ -18,7 +18,9 @@ package org.apache.solr.cloud;
|
||||||
|
|
||||||
import static org.apache.solr.common.util.Utils.makeMap;
|
import static org.apache.solr.common.util.Utils.makeMap;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -30,29 +32,36 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.lucene.util.LuceneTestCase.Nightly;
|
|
||||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||||
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
|
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
|
||||||
|
import org.apache.solr.client.solrj.SolrQuery;
|
||||||
import org.apache.solr.client.solrj.SolrRequest;
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
|
import org.apache.solr.client.solrj.SolrServerException;
|
||||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create;
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create;
|
||||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||||
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
||||||
|
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||||
import org.apache.solr.cloud.hdfs.HdfsTestUtil;
|
import org.apache.solr.cloud.hdfs.HdfsTestUtil;
|
||||||
|
import org.apache.solr.common.SolrInputDocument;
|
||||||
import org.apache.solr.common.cloud.ClusterStateUtil;
|
import org.apache.solr.common.cloud.ClusterStateUtil;
|
||||||
|
import org.apache.solr.common.cloud.Replica;
|
||||||
import org.apache.solr.common.cloud.Slice;
|
import org.apache.solr.common.cloud.Slice;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.params.CollectionParams;
|
import org.apache.solr.common.params.CollectionParams;
|
||||||
import org.apache.solr.common.params.MapSolrParams;
|
import org.apache.solr.common.params.MapSolrParams;
|
||||||
|
import org.apache.solr.common.params.SolrParams;
|
||||||
import org.apache.solr.common.util.ExecutorUtil;
|
import org.apache.solr.common.util.ExecutorUtil;
|
||||||
|
import org.apache.solr.core.CoreDescriptor;
|
||||||
|
import org.apache.solr.update.DirectUpdateHandler2;
|
||||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||||
import org.apache.solr.util.BadHdfsThreadsFilter;
|
import org.apache.solr.util.BadHdfsThreadsFilter;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
||||||
|
|
||||||
|
|
||||||
@Nightly
|
@Nightly
|
||||||
@Slow
|
@Slow
|
||||||
@SuppressSSL
|
@SuppressSSL
|
||||||
|
@ -70,11 +79,13 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
|
||||||
|
|
||||||
CompletionService<Object> completionService;
|
CompletionService<Object> completionService;
|
||||||
Set<Future<Object>> pending;
|
Set<Future<Object>> pending;
|
||||||
|
private final Map<String, String> collectionUlogDirMap = new HashMap<>();
|
||||||
|
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void hdfsFailoverBeforeClass() throws Exception {
|
public static void hdfsFailoverBeforeClass() throws Exception {
|
||||||
dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
|
dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
|
||||||
|
schemaString = "schema15.xml";
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
|
@ -83,6 +94,13 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
|
||||||
dfsCluster = null;
|
dfsCluster = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
@Override
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
collectionUlogDirMap.clear();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void distribSetUp() throws Exception {
|
public void distribSetUp() throws Exception {
|
||||||
super.distribSetUp();
|
super.distribSetUp();
|
||||||
|
@ -95,7 +113,6 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
|
||||||
|
|
||||||
|
|
||||||
public SharedFSAutoReplicaFailoverTest() {
|
public SharedFSAutoReplicaFailoverTest() {
|
||||||
sliceCount = 2;
|
|
||||||
completionService = new ExecutorCompletionService<>(executor);
|
completionService = new ExecutorCompletionService<>(executor);
|
||||||
pending = new HashSet<>();
|
pending = new HashSet<>();
|
||||||
}
|
}
|
||||||
|
@ -104,8 +121,11 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
|
||||||
@ShardsFixed(num = 4)
|
@ShardsFixed(num = 4)
|
||||||
public void test() throws Exception {
|
public void test() throws Exception {
|
||||||
try {
|
try {
|
||||||
|
// to keep uncommitted docs during failover
|
||||||
|
DirectUpdateHandler2.commitOnClose = false;
|
||||||
testBasics();
|
testBasics();
|
||||||
} finally {
|
} finally {
|
||||||
|
DirectUpdateHandler2.commitOnClose = true;
|
||||||
if (DEBUG) {
|
if (DEBUG) {
|
||||||
super.printLayout();
|
super.printLayout();
|
||||||
}
|
}
|
||||||
|
@ -162,6 +182,35 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
|
||||||
|
|
||||||
waitForRecoveriesToFinish(collection3, false);
|
waitForRecoveriesToFinish(collection3, false);
|
||||||
|
|
||||||
|
// a collection has only 1 replica per a shard
|
||||||
|
String collection4 = "solrj_collection4";
|
||||||
|
createCollectionRequest = new Create()
|
||||||
|
.setCollectionName(collection4)
|
||||||
|
.setNumShards(5)
|
||||||
|
.setReplicationFactor(1)
|
||||||
|
.setMaxShardsPerNode(5)
|
||||||
|
.setConfigName("conf1")
|
||||||
|
.setRouterField("text")
|
||||||
|
.setAutoAddReplicas(true);
|
||||||
|
CollectionAdminResponse response4 = createCollectionRequest.process(getCommonCloudSolrClient());
|
||||||
|
|
||||||
|
assertEquals(0, response4.getStatus());
|
||||||
|
assertTrue(response4.isSuccess());
|
||||||
|
|
||||||
|
waitForRecoveriesToFinish(collection4, false);
|
||||||
|
|
||||||
|
// all collections
|
||||||
|
String[] collections = {collection1, collection2, collection3, collection4};
|
||||||
|
|
||||||
|
// add some documents to collection4
|
||||||
|
final int numDocs = 100;
|
||||||
|
addDocs(collection4, numDocs, false); // indexed but not committed
|
||||||
|
|
||||||
|
// no result because not committed yet
|
||||||
|
queryAndAssertResultSize(collection4, 0, 10000);
|
||||||
|
|
||||||
|
assertUlogDir(collections);
|
||||||
|
|
||||||
ChaosMonkey.stop(jettys.get(1));
|
ChaosMonkey.stop(jettys.get(1));
|
||||||
ChaosMonkey.stop(jettys.get(2));
|
ChaosMonkey.stop(jettys.get(2));
|
||||||
|
|
||||||
|
@ -177,11 +226,21 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
|
||||||
// collection3 has maxShardsPerNode=1, there are 4 standard jetties and one control jetty and 2 nodes stopped
|
// collection3 has maxShardsPerNode=1, there are 4 standard jetties and one control jetty and 2 nodes stopped
|
||||||
ClusterStateUtil.waitForLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection3, 3, 30000);
|
ClusterStateUtil.waitForLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection3, 3, 30000);
|
||||||
|
|
||||||
|
// collection4 has maxShardsPerNode=5 and setMaxShardsPerNode=5
|
||||||
|
ClusterStateUtil.waitForLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection4, 5, 30000);
|
||||||
|
|
||||||
|
// all docs should be queried after failover
|
||||||
|
cloudClient.commit(); // to query all docs
|
||||||
|
assertSingleReplicationAndShardSize(collection4, 5);
|
||||||
|
queryAndAssertResultSize(collection4, numDocs, 10000);
|
||||||
|
|
||||||
// collection1 should still be at 4
|
// collection1 should still be at 4
|
||||||
assertEquals(4, ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1));
|
assertEquals(4, ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1));
|
||||||
// and collection2 less than 4
|
// and collection2 less than 4
|
||||||
assertTrue(ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection2) < 4);
|
assertTrue(ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection2) < 4);
|
||||||
|
|
||||||
|
assertUlogDir(collections);
|
||||||
|
|
||||||
ChaosMonkey.stop(jettys);
|
ChaosMonkey.stop(jettys);
|
||||||
ChaosMonkey.stop(controlJetty);
|
ChaosMonkey.stop(controlJetty);
|
||||||
|
|
||||||
|
@ -195,6 +254,12 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
|
||||||
assertSliceAndReplicaCount(collection1);
|
assertSliceAndReplicaCount(collection1);
|
||||||
assertSingleReplicationAndShardSize(collection3, 5);
|
assertSingleReplicationAndShardSize(collection3, 5);
|
||||||
|
|
||||||
|
// all docs should be queried
|
||||||
|
assertSingleReplicationAndShardSize(collection4, 5);
|
||||||
|
queryAndAssertResultSize(collection4, numDocs, 10000);
|
||||||
|
|
||||||
|
assertUlogDir(collections);
|
||||||
|
|
||||||
int jettyIndex = random().nextInt(jettys.size());
|
int jettyIndex = random().nextInt(jettys.size());
|
||||||
ChaosMonkey.stop(jettys.get(jettyIndex));
|
ChaosMonkey.stop(jettys.get(jettyIndex));
|
||||||
ChaosMonkey.start(jettys.get(jettyIndex));
|
ChaosMonkey.start(jettys.get(jettyIndex));
|
||||||
|
@ -203,8 +268,14 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
|
||||||
|
|
||||||
assertSliceAndReplicaCount(collection1);
|
assertSliceAndReplicaCount(collection1);
|
||||||
|
|
||||||
|
assertUlogDir(collections);
|
||||||
|
|
||||||
assertSingleReplicationAndShardSize(collection3, 5);
|
assertSingleReplicationAndShardSize(collection3, 5);
|
||||||
ClusterStateUtil.waitForLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection3, 5, 30000);
|
ClusterStateUtil.waitForLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection3, 5, 30000);
|
||||||
|
|
||||||
|
assertSingleReplicationAndShardSize(collection4, 5);
|
||||||
|
ClusterStateUtil.waitForLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection4, 5, 30000);
|
||||||
|
|
||||||
//disable autoAddReplicas
|
//disable autoAddReplicas
|
||||||
Map m = makeMap(
|
Map m = makeMap(
|
||||||
"action", CollectionParams.CollectionAction.CLUSTERPROP.toLower(),
|
"action", CollectionParams.CollectionAction.CLUSTERPROP.toLower(),
|
||||||
|
@ -236,6 +307,66 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
|
||||||
|
|
||||||
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 60000));
|
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 60000));
|
||||||
assertSliceAndReplicaCount(collection1);
|
assertSliceAndReplicaCount(collection1);
|
||||||
|
|
||||||
|
assertUlogDir(collections);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void queryAndAssertResultSize(String collection, int expectedResultSize, int timeoutMS)
|
||||||
|
throws SolrServerException, IOException, InterruptedException {
|
||||||
|
long startTimestamp = System.currentTimeMillis();
|
||||||
|
|
||||||
|
long actualResultSize = 0;
|
||||||
|
while(true) {
|
||||||
|
if (System.currentTimeMillis() - startTimestamp > timeoutMS || actualResultSize > expectedResultSize) {
|
||||||
|
fail("expected: " + expectedResultSize + ", actual: " + actualResultSize);
|
||||||
|
}
|
||||||
|
SolrParams queryAll = new SolrQuery("*:*");
|
||||||
|
cloudClient.setDefaultCollection(collection);
|
||||||
|
QueryResponse queryResponse = cloudClient.query(queryAll);
|
||||||
|
actualResultSize = queryResponse.getResults().getNumFound();
|
||||||
|
if(expectedResultSize == actualResultSize) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addDocs(String collection, int numDocs, boolean commit) throws SolrServerException, IOException {
|
||||||
|
for (int docId = 1; docId <= numDocs; docId++) {
|
||||||
|
SolrInputDocument doc = new SolrInputDocument();
|
||||||
|
doc.addField("id", docId);
|
||||||
|
doc.addField("text", "shard" + docId % 5);
|
||||||
|
cloudClient.setDefaultCollection(collection);
|
||||||
|
cloudClient.add(doc);
|
||||||
|
}
|
||||||
|
if (commit) {
|
||||||
|
cloudClient.commit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* After failover, ulogDir should not be changed.
|
||||||
|
*/
|
||||||
|
private void assertUlogDir(String... collections) {
|
||||||
|
for (String collection : collections) {
|
||||||
|
Collection<Slice> slices = cloudClient.getZkStateReader().getClusterState().getCollection(collection).getSlices();
|
||||||
|
for (Slice slice : slices) {
|
||||||
|
for (Replica replica : slice.getReplicas()) {
|
||||||
|
Map<String, Object> properties = replica.getProperties();
|
||||||
|
String coreName = replica.getCoreName();
|
||||||
|
String curUlogDir = (String) properties.get(CoreDescriptor.CORE_ULOGDIR);
|
||||||
|
String prevUlogDir = collectionUlogDirMap.get(coreName);
|
||||||
|
if (curUlogDir != null) {
|
||||||
|
if (prevUlogDir == null) {
|
||||||
|
collectionUlogDirMap.put(coreName, curUlogDir);
|
||||||
|
} else {
|
||||||
|
assertEquals(prevUlogDir, curUlogDir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertSingleReplicationAndShardSize(String collection, int numSlices) {
|
private void assertSingleReplicationAndShardSize(String collection, int numSlices) {
|
||||||
|
|
Loading…
Reference in New Issue