SOLR-15122 Replace sleeps with phaser await (#2291)

This commit is contained in:
Mike Drob 2021-02-03 19:39:04 -06:00 committed by GitHub
parent 40c5d6b750
commit 8fccdfe353
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 64 additions and 96 deletions

View File

@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Phaser;
/** /**
* This implementation allows Solr to dynamically change the underlying implementation * This implementation allows Solr to dynamically change the underlying implementation
@ -40,7 +41,7 @@ public final class DelegatingClusterEventProducer extends ClusterEventProducerBa
private ClusterEventProducer delegate; private ClusterEventProducer delegate;
// support for tests to make sure the update is completed // support for tests to make sure the update is completed
private volatile int version; private volatile Phaser phaser;
public DelegatingClusterEventProducer(CoreContainer cc) { public DelegatingClusterEventProducer(CoreContainer cc) {
super(cc); super(cc);
@ -56,6 +57,16 @@ public final class DelegatingClusterEventProducer extends ClusterEventProducerBa
super.close(); super.close();
} }
/**
* A phaser that will advance phases every time {@link #setDelegate(ClusterEventProducer)} is called.
* Useful for allowing tests to know when a new delegate is finished getting set.
*/
@VisibleForTesting
public void setDelegationPhaser(Phaser phaser) {
phaser.register();
this.phaser = phaser;
}
public void setDelegate(ClusterEventProducer newDelegate) { public void setDelegate(ClusterEventProducer newDelegate) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("--setting new delegate for CC-{}: {}", Integer.toHexString(cc.hashCode()), newDelegate); log.debug("--setting new delegate for CC-{}: {}", Integer.toHexString(cc.hashCode()), newDelegate);
@ -90,7 +101,11 @@ public final class DelegatingClusterEventProducer extends ClusterEventProducerBa
log.debug("--- delegate {} already in state {}", delegate, delegate.getState()); log.debug("--- delegate {} already in state {}", delegate, delegate.getState());
} }
} }
this.version++; Phaser localPhaser = phaser; // volatile read
if (localPhaser != null) {
assert localPhaser.getRegisteredParties() == 1;
localPhaser.arrive(); // we should be the only ones registered, so this will advance phase each time
}
} }
@Override @Override
@ -142,9 +157,4 @@ public final class DelegatingClusterEventProducer extends ClusterEventProducerBa
delegate.stop(); delegate.stop();
state = delegate.getState(); state = delegate.getState();
} }
@VisibleForTesting
public int getVersion() {
return version;
}
} }

View File

@ -21,14 +21,15 @@ import org.apache.solr.cluster.placement.PlacementPlugin;
import org.apache.solr.cluster.placement.PlacementPluginConfig; import org.apache.solr.cluster.placement.PlacementPluginConfig;
import org.apache.solr.cluster.placement.PlacementPluginFactory; import org.apache.solr.cluster.placement.PlacementPluginFactory;
import java.util.concurrent.Phaser;
/** /**
* Helper class to support dynamic reloading of plugin implementations. * Helper class to support dynamic reloading of plugin implementations.
*/ */
public final class DelegatingPlacementPluginFactory implements PlacementPluginFactory<PlacementPluginFactory.NoConfig> { public final class DelegatingPlacementPluginFactory implements PlacementPluginFactory<PlacementPluginFactory.NoConfig> {
private volatile PlacementPluginFactory<? extends PlacementPluginConfig> delegate; private volatile PlacementPluginFactory<? extends PlacementPluginConfig> delegate;
// support for tests to make sure the update is completed // support for tests to make sure the update is completed
private volatile int version; private volatile Phaser phaser;
@Override @Override
public PlacementPlugin createPluginInstance() { public PlacementPlugin createPluginInstance() {
@ -39,18 +40,27 @@ public final class DelegatingPlacementPluginFactory implements PlacementPluginFa
} }
} }
/**
* A phaser that will advance phases every time {@link #setDelegate(PlacementPluginFactory)} is called.
* Useful for allowing tests to know when a new delegate is finished getting set.
*/
@VisibleForTesting
public void setDelegationPhaser(Phaser phaser) {
phaser.register();
this.phaser = phaser;
}
public void setDelegate(PlacementPluginFactory<? extends PlacementPluginConfig> delegate) { public void setDelegate(PlacementPluginFactory<? extends PlacementPluginConfig> delegate) {
this.delegate = delegate; this.delegate = delegate;
this.version++; Phaser localPhaser = phaser; // volatile read
if (localPhaser != null) {
assert localPhaser.getRegisteredParties() == 1;
localPhaser.arrive(); // we should be the only ones registered, so this will advance phase each time
}
} }
@VisibleForTesting @VisibleForTesting
public PlacementPluginFactory<? extends PlacementPluginConfig> getDelegate() { public PlacementPluginFactory<? extends PlacementPluginConfig> getDelegate() {
return delegate; return delegate;
} }
@VisibleForTesting
public int getVersion() {
return version;
}
} }

View File

@ -23,15 +23,12 @@ import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.request.beans.PluginMeta; import org.apache.solr.client.solrj.request.beans.PluginMeta;
import org.apache.solr.client.solrj.response.V2Response; import org.apache.solr.client.solrj.response.V2Response;
import org.apache.solr.cloud.ClusterSingleton; import org.apache.solr.cloud.ClusterSingleton;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.SolrCloudTestCase; import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.cluster.events.impl.DefaultClusterEventProducer; import org.apache.solr.cluster.events.impl.DefaultClusterEventProducer;
import org.apache.solr.cluster.events.impl.DelegatingClusterEventProducer; import org.apache.solr.cluster.events.impl.DelegatingClusterEventProducer;
import org.apache.solr.common.cloud.ClusterProperties; import org.apache.solr.common.cloud.ClusterProperties;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils; import org.apache.solr.common.util.Utils;
import org.apache.solr.util.LogLevel; import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -47,8 +44,8 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
import static org.apache.solr.client.solrj.SolrRequest.METHOD.GET; import static org.apache.solr.client.solrj.SolrRequest.METHOD.GET;
@ -59,9 +56,8 @@ import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
*/ */
@LogLevel("org.apache.solr.cluster.events=DEBUG") @LogLevel("org.apache.solr.cluster.events=DEBUG")
public class ClusterEventProducerTest extends SolrCloudTestCase { public class ClusterEventProducerTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private AllEventsListener eventsListener; private AllEventsListener eventsListener;
private Phaser phaser;
@BeforeClass @BeforeClass
public static void setupCluster() throws Exception { public static void setupCluster() throws Exception {
@ -77,6 +73,12 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
cluster.deleteAllCollections(); cluster.deleteAllCollections();
eventsListener = new AllEventsListener(); eventsListener = new AllEventsListener();
cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().registerListener(eventsListener); cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().registerListener(eventsListener);
ClusterEventProducer clusterEventProducer = cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer();
assertTrue("not a delegating producer? " + clusterEventProducer.getClass(),
clusterEventProducer instanceof DelegatingClusterEventProducer);
DelegatingClusterEventProducer wrapper = (DelegatingClusterEventProducer) clusterEventProducer;
phaser = new Phaser();
wrapper.setDelegationPhaser(phaser);
} }
@After @After
@ -102,7 +104,7 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
@Test @Test
public void testEvents() throws Exception { public void testEvents() throws Exception {
int version = waitForVersionChange(-1, 10); int version = phaser.getPhase();
PluginMeta plugin = new PluginMeta(); PluginMeta plugin = new PluginMeta();
plugin.klass = DefaultClusterEventProducer.class.getName(); plugin.klass = DefaultClusterEventProducer.class.getName();
@ -114,7 +116,7 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
V2Response rsp = req.process(cluster.getSolrClient()); V2Response rsp = req.process(cluster.getSolrClient());
assertEquals(0, rsp.getStatus()); assertEquals(0, rsp.getStatus());
version = waitForVersionChange(version, 10); phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
// NODES_DOWN // NODES_DOWN
@ -281,7 +283,7 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
@Test @Test
public void testListenerPlugins() throws Exception { public void testListenerPlugins() throws Exception {
int version = waitForVersionChange(-1, 10); int version = phaser.getPhase();
PluginMeta plugin = new PluginMeta(); PluginMeta plugin = new PluginMeta();
plugin.klass = DefaultClusterEventProducer.class.getName(); plugin.klass = DefaultClusterEventProducer.class.getName();
@ -292,7 +294,7 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
.build(); .build();
V2Response rsp = req.process(cluster.getSolrClient()); V2Response rsp = req.process(cluster.getSolrClient());
assertEquals(0, rsp.getStatus()); assertEquals(0, rsp.getStatus());
version = waitForVersionChange(-1, 10); version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
plugin = new PluginMeta(); plugin = new PluginMeta();
plugin.name = "testplugin"; plugin.name = "testplugin";
@ -350,7 +352,7 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
.withPayload(Collections.singletonMap("remove", ClusterEventProducer.PLUGIN_NAME)) .withPayload(Collections.singletonMap("remove", ClusterEventProducer.PLUGIN_NAME))
.build(); .build();
req.process(cluster.getSolrClient()); req.process(cluster.getSolrClient());
version = waitForVersionChange(-1, 10); version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
dummyEventLatch = new CountDownLatch(1); dummyEventLatch = new CountDownLatch(1);
lastEvent = null; lastEvent = null;
@ -371,7 +373,7 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
.build(); .build();
rsp = req.process(cluster.getSolrClient()); rsp = req.process(cluster.getSolrClient());
assertEquals(0, rsp.getStatus()); assertEquals(0, rsp.getStatus());
version = waitForVersionChange(-1, 10); phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
dummyEventLatch = new CountDownLatch(1); dummyEventLatch = new CountDownLatch(1);
lastEvent = null; lastEvent = null;
@ -384,29 +386,4 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
assertNotNull("lastEvent should be COLLECTIONS_REMOVED", lastEvent); assertNotNull("lastEvent should be COLLECTIONS_REMOVED", lastEvent);
assertEquals("lastEvent should be COLLECTIONS_REMOVED", ClusterEvent.EventType.COLLECTIONS_REMOVED, lastEvent.getType()); assertEquals("lastEvent should be COLLECTIONS_REMOVED", ClusterEvent.EventType.COLLECTIONS_REMOVED, lastEvent.getType());
} }
private int waitForVersionChange(int currentVersion, int timeoutSec) throws Exception {
TimeOut timeout = new TimeOut(timeoutSec, TimeUnit.SECONDS, TimeSource.NANO_TIME);
Overseer overseer = cluster.getOpenOverseer();
if (overseer == null) {
throw new Exception("no overseer");
}
ClusterEventProducer clusterEventProducer = overseer.getCoreContainer().getClusterEventProducer();
assertTrue("not a delegating producer? " + clusterEventProducer.getClass(),
clusterEventProducer instanceof DelegatingClusterEventProducer);
DelegatingClusterEventProducer wrapper = (DelegatingClusterEventProducer) clusterEventProducer;
while (!timeout.hasTimedOut()) {
int newVersion = wrapper.getVersion();
if (newVersion < currentVersion) {
throw new Exception("Invalid version - went back! currentVersion=" + currentVersion +
" newVersion=" + newVersion);
} else if (currentVersion < newVersion) {
log.debug("--current version was {}, new version is {}", currentVersion, newVersion);
return newVersion;
}
timeout.sleep(200);
}
throw new TimeoutException("version didn't change in time, currentVersion=" + currentVersion);
}
} }

View File

@ -41,24 +41,20 @@ import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cluster.placement.plugins.MinimizeCoresPlacementFactory; import org.apache.solr.cluster.placement.plugins.MinimizeCoresPlacementFactory;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.CoreContainer; import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.LogLevel; import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
import org.junit.After; import org.junit.After;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -70,8 +66,6 @@ import static java.util.Collections.singletonMap;
*/ */
@LogLevel("org.apache.solr.cluster.placement.impl=DEBUG") @LogLevel("org.apache.solr.cluster.placement.impl=DEBUG")
public class PlacementPluginIntegrationTest extends SolrCloudTestCase { public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String COLLECTION = PlacementPluginIntegrationTest.class.getSimpleName() + "_collection"; private static final String COLLECTION = PlacementPluginIntegrationTest.class.getSimpleName() + "_collection";
private static SolrCloudManager cloudManager; private static SolrCloudManager cloudManager;
@ -145,14 +139,15 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
} }
@Test @Test
@SuppressWarnings("unchecked")
public void testDynamicReconfiguration() throws Exception { public void testDynamicReconfiguration() throws Exception {
PlacementPluginFactory<? extends PlacementPluginConfig> pluginFactory = cc.getPlacementPluginFactory(); PlacementPluginFactory<? extends PlacementPluginConfig> pluginFactory = cc.getPlacementPluginFactory();
assertTrue("wrong type " + pluginFactory.getClass().getName(), pluginFactory instanceof DelegatingPlacementPluginFactory); assertTrue("wrong type " + pluginFactory.getClass().getName(), pluginFactory instanceof DelegatingPlacementPluginFactory);
DelegatingPlacementPluginFactory wrapper = (DelegatingPlacementPluginFactory) pluginFactory; DelegatingPlacementPluginFactory wrapper = (DelegatingPlacementPluginFactory) pluginFactory;
Phaser phaser = new Phaser();
wrapper.setDelegationPhaser(phaser);
int version = wrapper.getVersion(); int version = phaser.getPhase();
log.debug("--initial version={}", version); assertTrue("wrong version " + version, version > -1);
PluginMeta plugin = new PluginMeta(); PluginMeta plugin = new PluginMeta();
plugin.name = PlacementPluginFactory.PLUGIN_NAME; plugin.name = PlacementPluginFactory.PLUGIN_NAME;
@ -164,9 +159,7 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
.build(); .build();
req.process(cluster.getSolrClient()); req.process(cluster.getSolrClient());
version = waitForVersionChange(version, wrapper, 10); version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
assertTrue("wrong version " + version, version > 0);
PlacementPluginFactory<? extends PlacementPluginConfig> factory = wrapper.getDelegate(); PlacementPluginFactory<? extends PlacementPluginConfig> factory = wrapper.getDelegate();
assertTrue("wrong type " + factory.getClass().getName(), factory instanceof MinimizeCoresPlacementFactory); assertTrue("wrong type " + factory.getClass().getName(), factory instanceof MinimizeCoresPlacementFactory);
@ -180,7 +173,7 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
.build(); .build();
req.process(cluster.getSolrClient()); req.process(cluster.getSolrClient());
version = waitForVersionChange(version, wrapper, 10); version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
factory = wrapper.getDelegate(); factory = wrapper.getDelegate();
assertTrue("wrong type " + factory.getClass().getName(), factory instanceof AffinityPlacementFactory); assertTrue("wrong type " + factory.getClass().getName(), factory instanceof AffinityPlacementFactory);
@ -197,7 +190,7 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
.build(); .build();
req.process(cluster.getSolrClient()); req.process(cluster.getSolrClient());
version = waitForVersionChange(version, wrapper, 10); version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
factory = wrapper.getDelegate(); factory = wrapper.getDelegate();
assertTrue("wrong type " + factory.getClass().getName(), factory instanceof AffinityPlacementFactory); assertTrue("wrong type " + factory.getClass().getName(), factory instanceof AffinityPlacementFactory);
config = ((AffinityPlacementFactory) factory).getConfig(); config = ((AffinityPlacementFactory) factory).getConfig();
@ -212,14 +205,8 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
.withPayload(singletonMap("add", plugin)) .withPayload(singletonMap("add", plugin))
.build(); .build();
req.process(cluster.getSolrClient()); req.process(cluster.getSolrClient());
try { final int oldVersion = version;
int newVersion = waitForVersionChange(version, wrapper, 5); expectThrows(TimeoutException.class, () -> phaser.awaitAdvanceInterruptibly(oldVersion, 5, TimeUnit.SECONDS));
if (newVersion != version) {
fail("factory configuration updated but plugin name was wrong: " + plugin);
}
} catch (TimeoutException te) {
// expected
}
// remove plugin // remove plugin
req = new V2Request.Builder("/cluster/plugin") req = new V2Request.Builder("/cluster/plugin")
.forceV2(true) .forceV2(true)
@ -227,7 +214,7 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
.withPayload("{remove: '" + PlacementPluginFactory.PLUGIN_NAME + "'}") .withPayload("{remove: '" + PlacementPluginFactory.PLUGIN_NAME + "'}")
.build(); .build();
req.process(cluster.getSolrClient()); req.process(cluster.getSolrClient());
waitForVersionChange(version, wrapper, 10); phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
factory = wrapper.getDelegate(); factory = wrapper.getDelegate();
assertNull("no factory should be present", factory); assertNull("no factory should be present", factory);
} }
@ -237,9 +224,10 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
PlacementPluginFactory<? extends PlacementPluginConfig> pluginFactory = cc.getPlacementPluginFactory(); PlacementPluginFactory<? extends PlacementPluginConfig> pluginFactory = cc.getPlacementPluginFactory();
assertTrue("wrong type " + pluginFactory.getClass().getName(), pluginFactory instanceof DelegatingPlacementPluginFactory); assertTrue("wrong type " + pluginFactory.getClass().getName(), pluginFactory instanceof DelegatingPlacementPluginFactory);
DelegatingPlacementPluginFactory wrapper = (DelegatingPlacementPluginFactory) pluginFactory; DelegatingPlacementPluginFactory wrapper = (DelegatingPlacementPluginFactory) pluginFactory;
Phaser phaser = new Phaser();
wrapper.setDelegationPhaser(phaser);
int version = wrapper.getVersion(); int version = phaser.getPhase();
log.debug("--initial version={}", version);
Set<String> nodeSet = new HashSet<>(); Set<String> nodeSet = new HashSet<>();
for (String node : cloudManager.getClusterStateProvider().getLiveNodes()) { for (String node : cloudManager.getClusterStateProvider().getLiveNodes()) {
@ -261,7 +249,7 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
.build(); .build();
req.process(cluster.getSolrClient()); req.process(cluster.getSolrClient());
version = waitForVersionChange(version, wrapper, 10); phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
CollectionAdminResponse rsp = CollectionAdminRequest.createCollection(SECONDARY_COLLECTION, "conf", 1, 3) CollectionAdminResponse rsp = CollectionAdminRequest.createCollection(SECONDARY_COLLECTION, "conf", 1, 3)
.process(cluster.getSolrClient()); .process(cluster.getSolrClient());
@ -398,21 +386,4 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
}); });
}); });
} }
private int waitForVersionChange(int currentVersion, DelegatingPlacementPluginFactory wrapper, int timeoutSec) throws Exception {
TimeOut timeout = new TimeOut(timeoutSec, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeout.hasTimedOut()) {
int newVersion = wrapper.getVersion();
if (newVersion < currentVersion) {
throw new Exception("Invalid version - went back! currentVersion=" + currentVersion +
" newVersion=" + newVersion);
} else if (currentVersion < newVersion) {
log.debug("--current version was {}, new version is {}", currentVersion, newVersion);
return newVersion;
}
timeout.sleep(200);
}
throw new TimeoutException("version didn't change in time, currentVersion=" + currentVersion);
}
} }