Fix WaitNotInLoop issues flagged by new Error Prone

This commit is contained in:
Ville Skyttä 2016-09-27 14:28:29 +03:00
parent 75b52169f8
commit 1a4fe92802
7 changed files with 87 additions and 47 deletions

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
public class ConcurrentUtil {
/**
* Await for condition, handling
* <a href="http://errorprone.info/bugpattern/WaitNotInLoop">spurious wakeups</a>.
* @param condition condition to await for
* @param timeout the maximum time to wait in milliseconds
* @return value from {@link Condition#await(long, TimeUnit)}
*/
public static boolean await(final Condition condition, final long timeout) throws InterruptedException {
boolean awaited = false;
long timeoutRemaining = timeout;
long awaitStarted = System.currentTimeMillis();
while (!awaited && timeoutRemaining > 0) {
awaited = condition.await(timeoutRemaining, TimeUnit.MILLISECONDS);
timeoutRemaining -= System.currentTimeMillis() - awaitStarted;
}
return awaited;
}
}

View File

@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.ConcurrentUtil;
import org.jboss.logging.Logger;
public final class ChannelImpl implements Channel {
@ -235,6 +236,25 @@ public final class ChannelImpl implements Channel {
this.transferring = transferring;
}
/**
* @param timeoutMsg message to log on blocking call failover timeout
*/
private void waitForFailOver(String timeoutMsg) {
try {
if (connection.getBlockingCallFailoverTimeout() < 0) {
while (failingOver) {
failoverCondition.await();
}
}
else if (!ConcurrentUtil.await(failoverCondition, connection.getBlockingCallFailoverTimeout())) {
logger.debug(timeoutMsg);
}
}
catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
}
// This must never called by more than one thread concurrently
private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) {
if (invokeInterceptors(packet, interceptors, connection) != null) {
@ -254,19 +274,7 @@ public final class ChannelImpl implements Channel {
try {
if (failingOver) {
try {
if (connection.getBlockingCallFailoverTimeout() < 0) {
failoverCondition.await();
}
else {
if (!failoverCondition.await(connection.getBlockingCallFailoverTimeout(), TimeUnit.MILLISECONDS)) {
logger.debug("timed-out waiting for fail-over condition on non-blocking send");
}
}
}
catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
waitForFailOver("timed-out waiting for fail-over condition on non-blocking send");
}
// Sanity check
@ -340,21 +348,7 @@ public final class ChannelImpl implements Channel {
try {
if (failingOver) {
try {
if (connection.getBlockingCallFailoverTimeout() < 0) {
while (failingOver) {
failoverCondition.await();
}
}
else {
if (!failoverCondition.await(connection.getBlockingCallFailoverTimeout(), TimeUnit.MILLISECONDS)) {
logger.debug("timed-out waiting for fail-over condition on blocking send");
}
}
}
catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
waitForFailOver("timed-out waiting for fail-over condition on blocking send");
}
response = null;

View File

@ -38,6 +38,7 @@ import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.utils.ConcurrentUtil;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.jboss.logging.Logger;
@ -267,11 +268,10 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract {
if (expectedBindings.size() > 0) {
logger.debug("Waiting remote group bindings to arrive before starting the server. timeout=" + timeout + " milliseconds");
//now we wait here for the rest to be received in onNotification, it will signal once all have been received.
//if we arent signaled then bindingsAdded still has some groupids we need to remove.
if (!awaitCondition.await(timeout, TimeUnit.MILLISECONDS)) {
//if we aren't signaled then bindingsAdded still has some groupids we need to remove.
if (!ConcurrentUtil.await(awaitCondition, timeout)) {
ActiveMQServerLogger.LOGGER.remoteGroupCoordinatorsNotStarted();
}
}
}
}

View File

@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.server.impl;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -30,6 +29,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.server.LiveNodeLocator;
import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
import org.apache.activemq.artemis.utils.ConcurrentUtil;
/**
* This implementation looks for any available live node, once tried with no success it is marked as
@ -63,10 +63,12 @@ public class AnyLiveNodeLocatorForReplication extends LiveNodeLocator {
if (untriedConnectors.isEmpty()) {
try {
if (timeout != -1L) {
condition.await(timeout, TimeUnit.MILLISECONDS);
ConcurrentUtil.await(condition, timeout);
}
else {
condition.await();
while (untriedConnectors.isEmpty()) {
condition.await();
}
}
}
catch (InterruptedException e) {
@ -153,4 +155,3 @@ public class AnyLiveNodeLocatorForReplication extends LiveNodeLocator {
super.notifyRegistrationFailed(alreadyReplicating);
}
}

View File

@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.server.impl;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -29,6 +28,7 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.server.LiveNodeLocator;
import org.apache.activemq.artemis.utils.ConcurrentUtil;
import org.jboss.logging.Logger;
/**
@ -65,12 +65,14 @@ public class AnyLiveNodeLocatorForScaleDown extends LiveNodeLocator {
if (connectors.isEmpty()) {
try {
if (timeout != -1L) {
if (!condition.await(timeout, TimeUnit.MILLISECONDS)) {
if (!ConcurrentUtil.await(condition, timeout)) {
throw new ActiveMQException("Timeout elapsed while waiting for cluster node");
}
}
else {
condition.await();
while (connectors.isEmpty()) {
condition.await();
}
}
}
catch (InterruptedException e) {
@ -153,4 +155,3 @@ public class AnyLiveNodeLocatorForScaleDown extends LiveNodeLocator {
}
}
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -27,6 +26,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.server.LiveNodeLocator;
import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
import org.apache.activemq.artemis.utils.ConcurrentUtil;
/**
* NamedLiveNodeLocatorForReplication looks for a live server in the cluster with a specific backupGroupName
@ -59,10 +59,12 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
if (liveConfiguration == null) {
try {
if (timeout != -1L) {
condition.await(timeout, TimeUnit.MILLISECONDS);
ConcurrentUtil.await(condition, timeout);
}
else {
condition.await();
while (liveConfiguration == null) {
condition.await();
}
}
}
catch (InterruptedException e) {
@ -117,4 +119,3 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator {
}
}
}

View File

@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.server.impl;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -29,6 +28,7 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.server.LiveNodeLocator;
import org.apache.activemq.artemis.utils.ConcurrentUtil;
import org.jboss.logging.Logger;
/**
@ -66,12 +66,14 @@ public class NamedLiveNodeLocatorForScaleDown extends LiveNodeLocator {
if (connectors.isEmpty()) {
try {
if (timeout != -1L) {
if (!condition.await(timeout, TimeUnit.MILLISECONDS)) {
if (!ConcurrentUtil.await(condition, timeout)) {
throw new ActiveMQException("Timeout elapsed while waiting for cluster node");
}
}
else {
condition.await();
while (connectors.isEmpty()) {
condition.await();
}
}
}
catch (InterruptedException e) {
@ -155,4 +157,3 @@ public class NamedLiveNodeLocatorForScaleDown extends LiveNodeLocator {
}
}
}