This closes #804

This commit is contained in:
Clebert Suconic 2016-09-27 09:15:04 -04:00
commit e09a5acbdc
19 changed files with 118 additions and 48 deletions

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
public class ConcurrentUtil {
/**
* Await for condition, handling
* <a href="http://errorprone.info/bugpattern/WaitNotInLoop">spurious wakeups</a>.
* @param condition condition to await for
* @param timeout the maximum time to wait in milliseconds
* @return value from {@link Condition#await(long, TimeUnit)}
*/
public static boolean await(final Condition condition, final long timeout) throws InterruptedException {
boolean awaited = false;
long timeoutRemaining = timeout;
long awaitStarted = System.currentTimeMillis();
while (!awaited && timeoutRemaining > 0) {
awaited = condition.await(timeoutRemaining, TimeUnit.MILLISECONDS);
timeoutRemaining -= System.currentTimeMillis() - awaitStarted;
}
return awaited;
}
}

View File

@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.ConcurrentUtil;
import org.jboss.logging.Logger;
public final class ChannelImpl implements Channel {
@ -235,6 +236,25 @@ public final class ChannelImpl implements Channel {
this.transferring = transferring;
}
/**
* @param timeoutMsg message to log on blocking call failover timeout
*/
private void waitForFailOver(String timeoutMsg) {
try {
if (connection.getBlockingCallFailoverTimeout() < 0) {
while (failingOver) {
failoverCondition.await();
}
}
else if (!ConcurrentUtil.await(failoverCondition, connection.getBlockingCallFailoverTimeout())) {
logger.debug(timeoutMsg);
}
}
catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
}
// This must never called by more than one thread concurrently
private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) {
if (invokeInterceptors(packet, interceptors, connection) != null) {
@ -254,19 +274,7 @@ public final class ChannelImpl implements Channel {
try {
if (failingOver) {
try {
if (connection.getBlockingCallFailoverTimeout() < 0) {
failoverCondition.await();
}
else {
if (!failoverCondition.await(connection.getBlockingCallFailoverTimeout(), TimeUnit.MILLISECONDS)) {
logger.debug("timed-out waiting for fail-over condition on non-blocking send");
}
}
}
catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
waitForFailOver("timed-out waiting for fail-over condition on non-blocking send");
}
// Sanity check
@ -340,21 +348,7 @@ public final class ChannelImpl implements Channel {
try {
if (failingOver) {
try {
if (connection.getBlockingCallFailoverTimeout() < 0) {
while (failingOver) {
failoverCondition.await();
}
}
else {
if (!failoverCondition.await(connection.getBlockingCallFailoverTimeout(), TimeUnit.MILLISECONDS)) {
logger.debug("timed-out waiting for fail-over condition on blocking send");
}
}
}
catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
waitForFailOver("timed-out waiting for fail-over condition on blocking send");
}
response = null;

View File

@ -109,6 +109,7 @@ public class ActiveMQConsumerResource extends AbstractActiveMQClientResource {
}
}
@Override
public boolean isAutoCreateQueue() {
return autoCreateQueue;
}
@ -118,6 +119,7 @@ public class ActiveMQConsumerResource extends AbstractActiveMQClientResource {
*
* @param autoCreateQueue
*/
@Override
public void setAutoCreateQueue(boolean autoCreateQueue) {
this.autoCreateQueue = autoCreateQueue;
}

View File

@ -55,6 +55,16 @@
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<compilerArgs>
<!-- TODO: do this only for generated-sources -->
<arg>-Xep:MissingOverride:WARN</arg>
</compilerArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>

View File

@ -38,6 +38,7 @@ import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.utils.ConcurrentUtil;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.jboss.logging.Logger;
@ -267,11 +268,10 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract {
if (expectedBindings.size() > 0) {
logger.debug("Waiting remote group bindings to arrive before starting the server. timeout=" + timeout + " milliseconds");
//now we wait here for the rest to be received in onNotification, it will signal once all have been received.
//if we arent signaled then bindingsAdded still has some groupids we need to remove.
if (!awaitCondition.await(timeout, TimeUnit.MILLISECONDS)) {
//if we aren't signaled then bindingsAdded still has some groupids we need to remove.
if (!ConcurrentUtil.await(awaitCondition, timeout)) {
ActiveMQServerLogger.LOGGER.remoteGroupCoordinatorsNotStarted();
}
}
}
}

View File

@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.server.impl;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -30,6 +29,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.server.LiveNodeLocator;
import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
import org.apache.activemq.artemis.utils.ConcurrentUtil;
/**
* This implementation looks for any available live node, once tried with no success it is marked as
@ -63,10 +63,12 @@ public class AnyLiveNodeLocatorForReplication extends LiveNodeLocator {
if (untriedConnectors.isEmpty()) {
try {
if (timeout != -1L) {
condition.await(timeout, TimeUnit.MILLISECONDS);
ConcurrentUtil.await(condition, timeout);
}
else {
condition.await();
while (untriedConnectors.isEmpty()) {
condition.await();
}
}
}
catch (InterruptedException e) {
@ -153,4 +155,3 @@ public class AnyLiveNodeLocatorForReplication extends LiveNodeLocator {
super.notifyRegistrationFailed(alreadyReplicating);
}
}

View File

@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.server.impl;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -29,6 +28,7 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.server.LiveNodeLocator;
import org.apache.activemq.artemis.utils.ConcurrentUtil;
import org.jboss.logging.Logger;
/**
@ -65,12 +65,14 @@ public class AnyLiveNodeLocatorForScaleDown extends LiveNodeLocator {
if (connectors.isEmpty()) {
try {
if (timeout != -1L) {
if (!condition.await(timeout, TimeUnit.MILLISECONDS)) {
if (!ConcurrentUtil.await(condition, timeout)) {
throw new ActiveMQException("Timeout elapsed while waiting for cluster node");
}
}
else {
condition.await();
while (connectors.isEmpty()) {
condition.await();
}
}
}
catch (InterruptedException e) {
@ -153,4 +155,3 @@ public class AnyLiveNodeLocatorForScaleDown extends LiveNodeLocator {
}
}
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -27,6 +26,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.server.LiveNodeLocator;
import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
import org.apache.activemq.artemis.utils.ConcurrentUtil;
/**
* NamedLiveNodeLocatorForReplication looks for a live server in the cluster with a specific backupGroupName
@ -59,10 +59,12 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
if (liveConfiguration == null) {
try {
if (timeout != -1L) {
condition.await(timeout, TimeUnit.MILLISECONDS);
ConcurrentUtil.await(condition, timeout);
}
else {
condition.await();
while (liveConfiguration == null) {
condition.await();
}
}
}
catch (InterruptedException e) {
@ -117,4 +119,3 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
}
}
}

View File

@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.server.impl;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -29,6 +28,7 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.server.LiveNodeLocator;
import org.apache.activemq.artemis.utils.ConcurrentUtil;
import org.jboss.logging.Logger;
/**
@ -66,12 +66,14 @@ public class NamedLiveNodeLocatorForScaleDown extends LiveNodeLocator {
if (connectors.isEmpty()) {
try {
if (timeout != -1L) {
if (!condition.await(timeout, TimeUnit.MILLISECONDS)) {
if (!ConcurrentUtil.await(condition, timeout)) {
throw new ActiveMQException("Timeout elapsed while waiting for cluster node");
}
}
else {
condition.await();
while (connectors.isEmpty()) {
condition.await();
}
}
}
catch (InterruptedException e) {
@ -155,4 +157,3 @@ public class NamedLiveNodeLocatorForScaleDown extends LiveNodeLocator {
}
}
}

View File

@ -930,6 +930,7 @@
<forceJavacCompilerUse>true</forceJavacCompilerUse>
<compilerId>${javac-compiler-id}</compilerId>
<compilerArgs>
<arg>-Xep:MissingOverride:ERROR</arg>
<arg>-Xep:NonAtomicVolatileUpdate:ERROR</arg>
<arg>-Xep:SynchronizeOnNonFinalField:ERROR</arg>
<arg>-Xep:StaticAccessedFromInstance:ERROR</arg>
@ -941,7 +942,12 @@
<dependency>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-compiler-javac-errorprone</artifactId>
<version>2.5</version>
<version>2.8</version>
</dependency>
<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_core</artifactId>
<version>2.0.9</version>
</dependency>
</dependencies>
</plugin>

View File

@ -255,6 +255,7 @@ public class UnmodifiableLink implements Link {
return link.detached();
}
@Override
public Record attachments() {
return link.attachments();
}

View File

@ -48,6 +48,7 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
@Before
@Override
public void setUp() throws Exception {
super.setUp();
server = createServer(true, true);
@ -55,6 +56,7 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
}
@After
@Override
public void tearDown() throws Exception {
super.tearDown();

View File

@ -38,6 +38,7 @@ public class DestinationCommandTest extends JMSTestBase {
private ByteArrayOutputStream error;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
this.output = new ByteArrayOutputStream(1024);

View File

@ -35,6 +35,7 @@ import org.junit.Test;
public class ExpireWhileLoadBalanceTest extends ClusterTestBase {
@Before
@Override
public void setUp() throws Exception {
super.setUp();

View File

@ -117,6 +117,7 @@ public class GlobalPagingTest extends PagingTest {
serverImpl.getMonitor().tick();
Thread t = new Thread() {
@Override
public void run() {
try {
sendFewMessages(numberOfMessages, session, producer, body);

View File

@ -34,6 +34,7 @@ public class ProtonMaxFrameSizeTest extends ProtonTestBase {
private static final int FRAME_SIZE = 512;
@Override
protected void configureAmqp(Map<String, Object> params) {
params.put("maxFrameSize", FRAME_SIZE);
}

View File

@ -54,6 +54,7 @@ public class ProtonPubSubTest extends ProtonTestBase {
private Connection connection;
private JmsConnectionFactory factory;
@Override
protected void configureAmqp(Map<String, Object> params) {
params.put("pubSubPrefix", prefix);
}

View File

@ -44,12 +44,14 @@ public class RestDeserializationTest extends RestTestBase {
private RestAMQConnection restConnection;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
createJettyServer("localhost", 12345);
}
@After
@Override
public void tearDown() throws Exception {
if (restConnection != null) {
restConnection.close();

View File

@ -42,12 +42,14 @@ public class RestTestBase extends JMSTestBase {
protected HandlerList handlers;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
webAppDir = testFolder.newFolder("test-apps");
}
@After
@Override
public void tearDown() throws Exception {
if (server != null) {
try {