SOLR-9236: AutoAddReplicas will append an extra /tlog to the update log location on replica failover.

This commit is contained in:
markrmiller 2016-07-06 11:14:01 -04:00
parent 1bd5d17891
commit 546093812c
3 changed files with 142 additions and 8 deletions

View File

@ -134,6 +134,9 @@ Bug Fixes
* SOLR-8858: SolrIndexSearcher#doc() completely ignores field filters unless lazy field loading is enabled.
(Caleb Rackliffe, David Smiley, shalin)
* SOLR-9236: AutoAddReplicas will append an extra /tlog to the update log location on replica failover.
(Eungsop Yoo, Mark Miller)
Optimizations
----------------------

View File

@ -451,7 +451,7 @@ public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
// for now, the collections API will use unique names
createCmd.setCoreName(coreName);
createCmd.setDataDir(dataDir);
createCmd.setUlogDir(ulogDir);
createCmd.setUlogDir(ulogDir.substring(0, ulogDir.length() - "/tlog".length()));
client.request(createCmd);
} catch (Exception e) {
SolrException.log(log, "Exception trying to create new replica on " + createUrl, e);

View File

@ -18,7 +18,9 @@ package org.apache.solr.cloud;
import static org.apache.solr.common.util.Utils.makeMap;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@ -30,29 +32,36 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.hdfs.HdfsTestUtil;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterStateUtil;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.BadHdfsThreadsFilter;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
@Nightly
@Slow
@SuppressSSL
@ -70,11 +79,13 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
CompletionService<Object> completionService;
Set<Future<Object>> pending;
private final Map<String, String> collectionUlogDirMap = new HashMap<>();
@BeforeClass
public static void hdfsFailoverBeforeClass() throws Exception {
dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
schemaString = "schema15.xml";
}
@AfterClass
@ -82,6 +93,13 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
HdfsTestUtil.teardownClass(dfsCluster);
dfsCluster = null;
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
collectionUlogDirMap.clear();
}
@Override
public void distribSetUp() throws Exception {
@ -95,7 +113,6 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
public SharedFSAutoReplicaFailoverTest() {
sliceCount = 2;
completionService = new ExecutorCompletionService<>(executor);
pending = new HashSet<>();
}
@ -104,8 +121,11 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
@ShardsFixed(num = 4)
public void test() throws Exception {
try {
// to keep uncommitted docs during failover
DirectUpdateHandler2.commitOnClose = false;
testBasics();
} finally {
DirectUpdateHandler2.commitOnClose = true;
if (DEBUG) {
super.printLayout();
}
@ -162,6 +182,35 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
waitForRecoveriesToFinish(collection3, false);
// a collection has only 1 replica per a shard
String collection4 = "solrj_collection4";
createCollectionRequest = new Create()
.setCollectionName(collection4)
.setNumShards(5)
.setReplicationFactor(1)
.setMaxShardsPerNode(5)
.setConfigName("conf1")
.setRouterField("text")
.setAutoAddReplicas(true);
CollectionAdminResponse response4 = createCollectionRequest.process(getCommonCloudSolrClient());
assertEquals(0, response4.getStatus());
assertTrue(response4.isSuccess());
waitForRecoveriesToFinish(collection4, false);
// all collections
String[] collections = {collection1, collection2, collection3, collection4};
// add some documents to collection4
final int numDocs = 100;
addDocs(collection4, numDocs, false); // indexed but not committed
// no result because not committed yet
queryAndAssertResultSize(collection4, 0, 10000);
assertUlogDir(collections);
ChaosMonkey.stop(jettys.get(1));
ChaosMonkey.stop(jettys.get(2));
@ -176,12 +225,22 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
// collection3 has maxShardsPerNode=1, there are 4 standard jetties and one control jetty and 2 nodes stopped
ClusterStateUtil.waitForLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection3, 3, 30000);
// collection4 has maxShardsPerNode=5 and setMaxShardsPerNode=5
ClusterStateUtil.waitForLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection4, 5, 30000);
// all docs should be queried after failover
cloudClient.commit(); // to query all docs
assertSingleReplicationAndShardSize(collection4, 5);
queryAndAssertResultSize(collection4, numDocs, 10000);
// collection1 should still be at 4
assertEquals(4, ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1));
// and collection2 less than 4
assertTrue(ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection2) < 4);
assertUlogDir(collections);
ChaosMonkey.stop(jettys);
ChaosMonkey.stop(controlJetty);
@ -194,7 +253,13 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
assertSliceAndReplicaCount(collection1);
assertSingleReplicationAndShardSize(collection3, 5);
// all docs should be queried
assertSingleReplicationAndShardSize(collection4, 5);
queryAndAssertResultSize(collection4, numDocs, 10000);
assertUlogDir(collections);
int jettyIndex = random().nextInt(jettys.size());
ChaosMonkey.stop(jettys.get(jettyIndex));
ChaosMonkey.start(jettys.get(jettyIndex));
@ -202,9 +267,15 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 60000));
assertSliceAndReplicaCount(collection1);
assertUlogDir(collections);
assertSingleReplicationAndShardSize(collection3, 5);
ClusterStateUtil.waitForLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection3, 5, 30000);
assertSingleReplicationAndShardSize(collection4, 5);
ClusterStateUtil.waitForLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection4, 5, 30000);
//disable autoAddReplicas
Map m = makeMap(
"action", CollectionParams.CollectionAction.CLUSTERPROP.toLower(),
@ -236,8 +307,68 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 60000));
assertSliceAndReplicaCount(collection1);
assertUlogDir(collections);
}
private void queryAndAssertResultSize(String collection, int expectedResultSize, int timeoutMS)
throws SolrServerException, IOException, InterruptedException {
long startTimestamp = System.currentTimeMillis();
long actualResultSize = 0;
while(true) {
if (System.currentTimeMillis() - startTimestamp > timeoutMS || actualResultSize > expectedResultSize) {
fail("expected: " + expectedResultSize + ", actual: " + actualResultSize);
}
SolrParams queryAll = new SolrQuery("*:*");
cloudClient.setDefaultCollection(collection);
QueryResponse queryResponse = cloudClient.query(queryAll);
actualResultSize = queryResponse.getResults().getNumFound();
if(expectedResultSize == actualResultSize) {
return;
}
Thread.sleep(1000);
}
}
private void addDocs(String collection, int numDocs, boolean commit) throws SolrServerException, IOException {
for (int docId = 1; docId <= numDocs; docId++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", docId);
doc.addField("text", "shard" + docId % 5);
cloudClient.setDefaultCollection(collection);
cloudClient.add(doc);
}
if (commit) {
cloudClient.commit();
}
}
/**
* After failover, ulogDir should not be changed.
*/
private void assertUlogDir(String... collections) {
for (String collection : collections) {
Collection<Slice> slices = cloudClient.getZkStateReader().getClusterState().getCollection(collection).getSlices();
for (Slice slice : slices) {
for (Replica replica : slice.getReplicas()) {
Map<String, Object> properties = replica.getProperties();
String coreName = replica.getCoreName();
String curUlogDir = (String) properties.get(CoreDescriptor.CORE_ULOGDIR);
String prevUlogDir = collectionUlogDirMap.get(coreName);
if (curUlogDir != null) {
if (prevUlogDir == null) {
collectionUlogDirMap.put(coreName, curUlogDir);
} else {
assertEquals(prevUlogDir, curUlogDir);
}
}
}
}
}
}
private void assertSingleReplicationAndShardSize(String collection, int numSlices) {
Collection<Slice> slices;
slices = cloudClient.getZkStateReader().getClusterState().getActiveSlices(collection);