diff --git a/src/main/java/org/apache/commons/lang3/concurrent/AbstractCircuitBreaker.java b/src/main/java/org/apache/commons/lang3/concurrent/AbstractCircuitBreaker.java new file mode 100644 index 000000000..6df864253 --- /dev/null +++ b/src/main/java/org/apache/commons/lang3/concurrent/AbstractCircuitBreaker.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.lang3.concurrent; + +import java.beans.PropertyChangeListener; +import java.beans.PropertyChangeSupport; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Base class for circuit breakers. + * + * @param the type of the value monitored by this circuit breaker + */ +public abstract class AbstractCircuitBreaker implements CircuitBreaker { + /** + * The name of the open property as it is passed to registered + * change listeners. + */ + public static final String PROPERTY_NAME = "open"; + + /** The current state of this circuit breaker. */ + protected final AtomicReference state = new AtomicReference(State.CLOSED); + + /** An object for managing change listeners registered at this instance. */ + private final PropertyChangeSupport changeSupport; + + /** + * Creates an {@AbstractCircuitBreaker}. It also creates an internal {@code PropertyChangeSupport}. + */ + public AbstractCircuitBreaker() { + changeSupport = new PropertyChangeSupport(this); + } + + /** + * {@inheritDoc} + */ + public boolean isOpen() { + return isOpen(state.get()); + } + + /** + * {@inheritDoc} + */ + public boolean isClosed() { + return !isOpen(); + } + + /** + * {@inheritDoc} + */ + public abstract boolean checkState(); + + /** + * {@inheritDoc} + */ + public abstract boolean incrementAndCheckState(T increment); + + /** + * {@inheritDoc} + */ + public void close() { + changeState(State.CLOSED); + } + + /** + * {@inheritDoc} + */ + public void open() { + changeState(State.OPEN); + } + + /** + * Converts the given state value to a boolean open property. + * + * @param state the state to be converted + * @return the boolean open flag + */ + protected static boolean isOpen(State state) { + return state == State.OPEN; + } + + /** + * Changes the internal state of this circuit breaker. If there is actually a change + * of the state value, all registered change listeners are notified. + * + * @param newState the new state to be set + */ + protected void changeState(State newState) { + if (state.compareAndSet(newState.oppositeState(), newState)) { + changeSupport.firePropertyChange(PROPERTY_NAME, !isOpen(newState), isOpen(newState)); + } + } + + /** + * Adds a change listener to this circuit breaker. This listener is notified whenever + * the state of this circuit breaker changes. If the listener is + * null, it is silently ignored. + * + * @param listener the listener to be added + */ + public void addChangeListener(PropertyChangeListener listener) { + changeSupport.addPropertyChangeListener(listener); + } + + /** + * Removes the specified change listener from this circuit breaker. + * + * @param listener the listener to be removed + */ + public void removeChangeListener(PropertyChangeListener listener) { + changeSupport.removePropertyChangeListener(listener); + } + + /** + * An internal enumeration representing the different states of a circuit + * breaker. This class also contains some logic for performing state + * transitions. This is done to avoid complex if-conditions in the code of + * {@code CircuitBreaker}. + */ + protected static enum State { + CLOSED { + /** + * {@inheritDoc} + */ + @Override + public State oppositeState() { + return OPEN; + } + }, + + OPEN { + /** + * {@inheritDoc} + */ + @Override + public State oppositeState() { + return CLOSED; + } + }; + + /** + * Returns the opposite state to the represented state. This is useful + * for flipping the current state. + * + * @return the opposite state + */ + public abstract State oppositeState(); + } + +} diff --git a/src/main/java/org/apache/commons/lang3/concurrent/CircuitBreaker.java b/src/main/java/org/apache/commons/lang3/concurrent/CircuitBreaker.java new file mode 100644 index 000000000..780af468d --- /dev/null +++ b/src/main/java/org/apache/commons/lang3/concurrent/CircuitBreaker.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.lang3.concurrent; + +/** + *

+ * An interface describing a Circuit Breaker component. + *

+ *

+ * A circuit breaker can be used to protect an application against unreliable + * services or unexpected load. It typically monitors a specific resource. As long as this + * resource works as expected, it stays in state closed, meaning that the + * resource can be used. If problems are encountered when using the resource, the circuit + * breaker can switch into state open; then access to this resource is + * prohibited. Depending on a concrete implementation, it is possible that the circuit + * breaker switches back to state closed when the resource becomes available + * again. + *

+ *

+ * This interface defines a generic protocol of a circuit breaker component. It should be + * sufficiently generic to be applied to multiple different use cases. + *

+ * + * @param the type of the value monitored by this circuit breaker + */ +public interface CircuitBreaker { + /** + * Returns the current open state of this circuit breaker. A return value of + * true means that the circuit breaker is currently open indicating a + * problem in the monitored sub system. + * + * @return the current open state of this circuit breaker + */ + boolean isOpen(); + + /** + * Returns the current closed state of this circuit breaker. A return value of + * true means that the circuit breaker is currently closed. This + * means that everything is okay with the monitored sub system. + * + * @return the current closed state of this circuit breaker + */ + boolean isClosed(); + + /** + * Checks the state of this circuit breaker and changes it if necessary. The return + * value indicates whether the circuit breaker is now in state {@code CLOSED}; a value + * of true typically means that the current operation can continue. + * + * @return true if the circuit breaker is now closed; + * false otherwise + */ + boolean checkState(); + + /** + * Closes this circuit breaker. Its state is changed to closed. If this circuit + * breaker is already closed, this method has no effect. + */ + void close(); + + /** + * Opens this circuit breaker. Its state is changed to open. Depending on a concrete + * implementation, it may close itself again if the monitored sub system becomes + * available. If this circuit breaker is already open, this method has no effect. + */ + void open(); + + /** + * Increments the monitored value and performs a check of the current state of this + * circuit breaker. This method works like {@link #checkState()}, but the monitored + * value is incremented before the state check is performed. + * + * @param increment value to increment in the monitored value of the circuit breaker + * @return true if the circuit breaker is now closed; + * false otherwise + */ + boolean incrementAndCheckState(T increment); +} diff --git a/src/main/java/org/apache/commons/lang3/concurrent/CircuitBreakingException.java b/src/main/java/org/apache/commons/lang3/concurrent/CircuitBreakingException.java new file mode 100644 index 000000000..e6cbdf784 --- /dev/null +++ b/src/main/java/org/apache/commons/lang3/concurrent/CircuitBreakingException.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.lang3.concurrent; + +/** + *

+ * An exception class used for reporting runtime error conditions related to + * circuit breakers. + *

+ */ +public class CircuitBreakingException extends RuntimeException { + /** + * The serial version UID. + */ + private static final long serialVersionUID = 1408176654686913340L; + + /** + * Creates a new, uninitialized instance of {@code CircuitBreakingException}. + */ + public CircuitBreakingException() { + super(); + } + + /** + * Creates a new instance of {@code CircuitBreakingException} and initializes it with the given message and cause. + * + * @param message the error message + * @param cause the cause of this exception + */ + public CircuitBreakingException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Creates a new instance of {@code CircuitBreakingException} and initializes it with the given message. + * + * @param message the error message + */ + public CircuitBreakingException(String message) { + super(message); + } + + /** + * Creates a new instance of {@code CircuitBreakingException} and initializes it with the given cause. + * + * @param cause the cause of this exception + */ + public CircuitBreakingException(Throwable cause) { + super(cause); + } + +} diff --git a/src/main/java/org/apache/commons/lang3/concurrent/EventCountCircuitBreaker.java b/src/main/java/org/apache/commons/lang3/concurrent/EventCountCircuitBreaker.java new file mode 100644 index 000000000..312c3b4eb --- /dev/null +++ b/src/main/java/org/apache/commons/lang3/concurrent/EventCountCircuitBreaker.java @@ -0,0 +1,567 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.lang3.concurrent; + +import java.util.EnumMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + *

+ * A simple implementation of the Circuit Breaker pattern + * that counts specific events. + *

+ *

+ * A circuit breaker can be used to protect an application against unreliable + * services or unexpected load. A newly created {@code EventCountCircuitBreaker} object is + * initially in state closed meaning that no problem has been detected. When the + * application encounters specific events (like errors or service timeouts), it tells the + * circuit breaker to increment an internal counter. If the number of events reported in a + * specific time interval exceeds a configurable threshold, the circuit breaker changes + * into state open. This means that there is a problem with the associated sub + * system; the application should no longer call it, but give it some time to settle down. + * The circuit breaker can be configured to switch back to closed state after a + * certain time frame if the number of events received goes below a threshold. + *

+ *

+ * When a {@code EventCountCircuitBreaker} object is constructed the following parameters + * can be provided: + *

+ *
    + *
  • A threshold for the number of events that causes a state transition to + * open state. If more events are received in the configured check interval, the + * circuit breaker switches to open state.
  • + *
  • The interval for checks whether the circuit breaker should open. So it is possible + * to specify something like "The circuit breaker should open if more than 10 errors are + * encountered in a minute."
  • + *
  • The same parameters can be specified for automatically closing the circuit breaker + * again, as in "If the number of requests goes down to 100 per minute, the circuit + * breaker should close itself again". Depending on the use case, it may make sense to use + * a slightly lower threshold for closing the circuit breaker than for opening it to avoid + * continuously flipping when the number of events received is close to the threshold.
  • + *
+ *

+ * This class supports the following typical use cases: + *

+ *

+ * Protecting against load peaks + *

+ *

+ * Imagine you have a server which can handle a certain number of requests per minute. + * Suddenly, the number of requests increases significantly - maybe because a connected + * partner system is going mad or due to a denial of service attack. A + * {@code EventCountCircuitBreaker} can be configured to stop the application from + * processing requests when a sudden peak load is detected and to start request processing + * again when things calm down. The following code fragment shows a typical example of + * such a scenario. Here the {@code EventCountCircuitBreaker} allows up to 1000 requests + * per minute before it interferes. When the load goes down again to 800 requests per + * second it switches back to state closed: + *

+ * + *
+ * EventCountCircuitBreaker breaker = new EventCountCircuitBreaker(1000, 1, TimeUnit.MINUTE, 800);
+ * ...
+ * public void handleRequest(Request request) {
+ *     if (breaker.incrementAndCheckState()) {
+ *         // actually handle this request
+ *     } else {
+ *         // do something else, e.g. send an error code
+ *     }
+ * }
+ * 
+ *

+ * Deal with an unreliable service + *

+ *

+ * In this scenario, an application uses an external service which may fail from time to + * time. If there are too many errors, the service is considered down and should not be + * called for a while. This can be achieved using the following pattern - in this concrete + * example we accept up to 5 errors in 2 minutes; if this limit is reached, the service is + * given a rest time of 10 minutes: + *

+ * + *
+ * EventCountCircuitBreaker breaker = new EventCountCircuitBreaker(5, 2, TimeUnit.MINUTE, 5, 10, TimeUnit.MINUTE);
+ * ...
+ * public void handleRequest(Request request) {
+ *     if (breaker.checkState()) {
+ *         try {
+ *             service.doSomething();
+ *         } catch (ServiceException ex) {
+ *             breaker.incrementAndCheckState();
+ *         }
+ *     } else {
+ *         // return an error code, use an alternative service, etc.
+ *     }
+ * }
+ * 
+ *

+ * In addition to automatic state transitions, the state of a circuit breaker can be + * changed manually using the methods {@link #open()} and {@link #close()}. It is also + * possible to register {@code PropertyChangeListener} objects that get notified whenever + * a state transition occurs. This is useful, for instance to directly react on a freshly + * detected error condition. + *

+ *

+ * Implementation notes: + *

+ *
    + *
  • This implementation uses non-blocking algorithms to update the internal counter and + * state. This should be pretty efficient if there is not too much contention.
  • + *
  • This implementation is not intended to operate as a high-precision timer in very + * short check intervals. It is deliberately kept simple to avoid complex and + * time-consuming state checks. It should work well in time intervals from a few seconds + * up to minutes and longer. If the intervals become too short, there might be race + * conditions causing spurious state transitions.
  • + *
  • The handling of check intervals is a bit simplistic. Therefore, there is no + * guarantee that the circuit breaker is triggered at a specific point in time; there may + * be some delay (less than a check interval).
  • + *
+ */ +public class EventCountCircuitBreaker extends AbstractCircuitBreaker { + + /** A map for accessing the strategy objects for the different states. */ + private static final Map STRATEGY_MAP = createStrategyMap(); + + /** Stores information about the current check interval. */ + private final AtomicReference checkIntervalData; + + /** The threshold for opening the circuit breaker. */ + private final int openingThreshold; + + /** The time interval for opening the circuit breaker. */ + private final long openingInterval; + + /** The threshold for closing the circuit breaker. */ + private final int closingThreshold; + + /** The time interval for closing the circuit breaker. */ + private final long closingInterval; + + /** + * Creates a new instance of {@code EventCountCircuitBreaker} and initializes all properties for + * opening and closing it based on threshold values for events occurring in specific + * intervals. + * + * @param openingThreshold the threshold for opening the circuit breaker; if this + * number of events is received in the time span determined by the opening interval, + * the circuit breaker is opened + * @param openingInterval the interval for opening the circuit breaker + * @param openingUnit the {@code TimeUnit} defining the opening interval + * @param closingThreshold the threshold for closing the circuit breaker; if the + * number of events received in the time span determined by the closing interval goes + * below this threshold, the circuit breaker is closed again + * @param closingInterval the interval for closing the circuit breaker + * @param closingUnit the {@code TimeUnit} defining the closing interval + */ + public EventCountCircuitBreaker(int openingThreshold, long openingInterval, + TimeUnit openingUnit, int closingThreshold, long closingInterval, + TimeUnit closingUnit) { + super(); + checkIntervalData = new AtomicReference(new CheckIntervalData(0, 0)); + this.openingThreshold = openingThreshold; + this.openingInterval = openingUnit.toNanos(openingInterval); + this.closingThreshold = closingThreshold; + this.closingInterval = closingUnit.toNanos(closingInterval); + } + + /** + * Creates a new instance of {@code EventCountCircuitBreaker} with the same interval for opening + * and closing checks. + * + * @param openingThreshold the threshold for opening the circuit breaker; if this + * number of events is received in the time span determined by the check interval, the + * circuit breaker is opened + * @param checkInterval the check interval for opening or closing the circuit breaker + * @param checkUnit the {@code TimeUnit} defining the check interval + * @param closingThreshold the threshold for closing the circuit breaker; if the + * number of events received in the time span determined by the check interval goes + * below this threshold, the circuit breaker is closed again + */ + public EventCountCircuitBreaker(int openingThreshold, long checkInterval, TimeUnit checkUnit, + int closingThreshold) { + this(openingThreshold, checkInterval, checkUnit, closingThreshold, checkInterval, + checkUnit); + } + + /** + * Creates a new instance of {@code EventCountCircuitBreaker} which uses the same parameters for + * opening and closing checks. + * + * @param threshold the threshold for changing the status of the circuit breaker; if + * the number of events received in a check interval is greater than this value, the + * circuit breaker is opened; if it is lower than this value, it is closed again + * @param checkInterval the check interval for opening or closing the circuit breaker + * @param checkUnit the {@code TimeUnit} defining the check interval + */ + public EventCountCircuitBreaker(int threshold, long checkInterval, TimeUnit checkUnit) { + this(threshold, checkInterval, checkUnit, threshold); + } + + /** + * Returns the threshold value for opening the circuit breaker. If this number of + * events is received in the time span determined by the opening interval, the circuit + * breaker is opened. + * + * @return the opening threshold + */ + public int getOpeningThreshold() { + return openingThreshold; + } + + /** + * Returns the interval (in nanoseconds) for checking for the opening threshold. + * + * @return the opening check interval + */ + public long getOpeningInterval() { + return openingInterval; + } + + /** + * Returns the threshold value for closing the circuit breaker. If the number of + * events received in the time span determined by the closing interval goes below this + * threshold, the circuit breaker is closed again. + * + * @return the closing threshold + */ + public int getClosingThreshold() { + return closingThreshold; + } + + /** + * Returns the interval (in nanoseconds) for checking for the closing threshold. + * + * @return the opening check interval + */ + public long getClosingInterval() { + return closingInterval; + } + + /** + * {@inheritDoc} This implementation checks the internal event counter against the + * threshold values and the check intervals. This may cause a state change of this + * circuit breaker. + */ + @Override + public boolean checkState() { + return performStateCheck(0); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean incrementAndCheckState(Integer increment) + throws CircuitBreakingException { + return performStateCheck(1); + } + + /** + * Increments the monitored value by 1 and performs a check of the current state of this + * circuit breaker. This method works like {@link #checkState()}, but the monitored + * value is incremented before the state check is performed. + * + * @return true if the circuit breaker is now closed; + * false otherwise + */ + public boolean incrementAndCheckState() { + return incrementAndCheckState(1); + } + + /** + * {@inheritDoc} This circuit breaker may close itself again if the number of events + * received during a check interval goes below the closing threshold. If this circuit + * breaker is already open, this method has no effect, except that a new check + * interval is started. + */ + @Override + public void open() { + super.open(); + checkIntervalData.set(new CheckIntervalData(0, now())); + } + + /** + * {@inheritDoc} A new check interval is started. If too many events are received in + * this interval, the circuit breaker changes again to state open. If this circuit + * breaker is already closed, this method has no effect, except that a new check + * interval is started. + */ + @Override + public void close() { + super.close(); + checkIntervalData.set(new CheckIntervalData(0, now())); + } + + /** + * Actually checks the state of this circuit breaker and executes a state transition + * if necessary. + * + * @param increment the increment for the internal counter + * @return a flag whether the circuit breaker is now closed + */ + private boolean performStateCheck(int increment) { + CheckIntervalData currentData; + CheckIntervalData nextData; + State currentState; + + do { + long time = now(); + currentState = state.get(); + currentData = checkIntervalData.get(); + nextData = nextCheckIntervalData(increment, currentData, currentState, time); + } while (!updateCheckIntervalData(currentData, nextData)); + + // This might cause a race condition if other changes happen in between! + // Refer to the header comment! + if (stateStrategy(currentState).isStateTransition(this, currentData, nextData)) { + currentState = currentState.oppositeState(); + changeStateAndStartNewCheckInterval(currentState); + } + return !isOpen(currentState); + } + + /** + * Updates the {@code CheckIntervalData} object. The current data object is replaced + * by the one modified by the last check. The return value indicates whether this was + * successful. If it is false, another thread interfered, and the + * whole operation has to be redone. + * + * @param currentData the current check data object + * @param nextData the replacing check data object + * @return a flag whether the update was successful + */ + private boolean updateCheckIntervalData(CheckIntervalData currentData, + CheckIntervalData nextData) { + return currentData == nextData + || checkIntervalData.compareAndSet(currentData, nextData); + } + + /** + * Changes the state of this circuit breaker and also initializes a new + * {@code CheckIntervalData} object. + * + * @param newState the new state to be set + */ + private void changeStateAndStartNewCheckInterval(State newState) { + changeState(newState); + checkIntervalData.set(new CheckIntervalData(0, now())); + } + + /** + * Calculates the next {@code CheckIntervalData} object based on the current data and + * the current state. The next data object takes the counter increment and the current + * time into account. + * + * @param increment the increment for the internal counter + * @param currentData the current check data object + * @param currentState the current state of the circuit breaker + * @param time the current time + * @return the updated {@code CheckIntervalData} object + */ + private CheckIntervalData nextCheckIntervalData(int increment, + CheckIntervalData currentData, State currentState, long time) { + CheckIntervalData nextData; + if (stateStrategy(currentState).isCheckIntervalFinished(this, currentData, time)) { + nextData = new CheckIntervalData(increment, time); + } else { + nextData = currentData.increment(increment); + } + return nextData; + } + + /** + * Returns the current time in nanoseconds. This method is used to obtain the current + * time. This is needed to calculate the check intervals correctly. + * + * @return the current time in nanoseconds + */ + long now() { + return System.nanoTime(); + } + + /** + * Returns the {@code StateStrategy} object responsible for the given state. + * + * @param state the state + * @return the corresponding {@code StateStrategy} + * @throws CircuitBreakingException if the strategy cannot be resolved + */ + private static StateStrategy stateStrategy(State state) { + StateStrategy strategy = STRATEGY_MAP.get(state); + return strategy; + } + + /** + * Creates the map with strategy objects. It allows access for a strategy for a given + * state. + * + * @return the strategy map + */ + private static Map createStrategyMap() { + Map map = new EnumMap(State.class); + map.put(State.CLOSED, new StateStrategyClosed()); + map.put(State.OPEN, new StateStrategyOpen()); + return map; + } + + /** + * An internally used data class holding information about the checks performed by + * this class. Basically, the number of received events and the start time of the + * current check interval are stored. + */ + private static class CheckIntervalData { + /** The counter for events. */ + private final int eventCount; + + /** The start time of the current check interval. */ + private final long checkIntervalStart; + + /** + * Creates a new instance of {@code CheckIntervalData}. + * + * @param count the current count value + * @param intervalStart the start time of the check interval + */ + public CheckIntervalData(int count, long intervalStart) { + eventCount = count; + checkIntervalStart = intervalStart; + } + + /** + * Returns the event counter. + * + * @return the number of received events + */ + public int getEventCount() { + return eventCount; + } + + /** + * Returns the start time of the current check interval. + * + * @return the check interval start time + */ + public long getCheckIntervalStart() { + return checkIntervalStart; + } + + /** + * Returns a new instance of {@code CheckIntervalData} with the event counter + * incremented by the given delta. If the delta is 0, this object is returned. + * + * @param delta the delta + * @return the updated instance + */ + public CheckIntervalData increment(int delta) { + return (delta != 0) ? new CheckIntervalData(getEventCount() + delta, + getCheckIntervalStart()) : this; + } + } + + /** + * Internally used class for executing check logic based on the current state of the + * circuit breaker. Having this logic extracted into special classes avoids complex + * if-then-else cascades. + */ + private abstract static class StateStrategy { + /** + * Returns a flag whether the end of the current check interval is reached. + * + * @param breaker the {@code CircuitBreaker} + * @param currentData the current state object + * @param now the current time + * @return a flag whether the end of the current check interval is reached + */ + public boolean isCheckIntervalFinished(EventCountCircuitBreaker breaker, + CheckIntervalData currentData, long now) { + return now - currentData.getCheckIntervalStart() > fetchCheckInterval(breaker); + } + + /** + * Checks whether the specified {@code CheckIntervalData} objects indicate that a + * state transition should occur. Here the logic which checks for thresholds + * depending on the current state is implemented. + * + * @param breaker the {@code CircuitBreaker} + * @param currentData the current {@code CheckIntervalData} object + * @param nextData the updated {@code CheckIntervalData} object + * @return a flag whether a state transition should be performed + */ + public abstract boolean isStateTransition(EventCountCircuitBreaker breaker, + CheckIntervalData currentData, CheckIntervalData nextData); + + /** + * Obtains the check interval to applied for the represented state from the given + * {@code CircuitBreaker}. + * + * @param breaker the {@code CircuitBreaker} + * @return the check interval to be applied + */ + protected abstract long fetchCheckInterval(EventCountCircuitBreaker breaker); + } + + /** + * A specialized {@code StateStrategy} implementation for the state closed. + */ + private static class StateStrategyClosed extends StateStrategy { + + /** + * {@inheritDoc} + */ + @Override + public boolean isStateTransition(EventCountCircuitBreaker breaker, + CheckIntervalData currentData, CheckIntervalData nextData) { + return nextData.getEventCount() > breaker.getOpeningThreshold(); + } + + /** + * {@inheritDoc} + */ + @Override + protected long fetchCheckInterval(EventCountCircuitBreaker breaker) { + return breaker.getOpeningInterval(); + } + } + + /** + * A specialized {@code StateStrategy} implementation for the state open. + */ + private static class StateStrategyOpen extends StateStrategy { + /** + * {@inheritDoc} + */ + @Override + public boolean isStateTransition(EventCountCircuitBreaker breaker, + CheckIntervalData currentData, CheckIntervalData nextData) { + return nextData.getCheckIntervalStart() != currentData + .getCheckIntervalStart() + && currentData.getEventCount() < breaker.getClosingThreshold(); + } + + /** + * {@inheritDoc} + */ + @Override + protected long fetchCheckInterval(EventCountCircuitBreaker breaker) { + return breaker.getClosingInterval(); + } + } + +} diff --git a/src/main/java/org/apache/commons/lang3/concurrent/ThresholdCircuitBreaker.java b/src/main/java/org/apache/commons/lang3/concurrent/ThresholdCircuitBreaker.java new file mode 100644 index 000000000..06b715214 --- /dev/null +++ b/src/main/java/org/apache/commons/lang3/concurrent/ThresholdCircuitBreaker.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.lang3.concurrent; + +import java.util.concurrent.atomic.AtomicLong; + +/** + *

+ * A simple implementation of the Circuit Breaker pattern + * that opens if the requested increment amount is greater than a given threshold. + *

+ * + *

+ * It contains an internal counter that starts in zero, and each call increments the counter by a given amount. + * If the threshold is zero, the circuit breaker will be in a permanent open state. + *

+ * + *

+ * An example of use case could be a memory circuit breaker. + *

+ * + *
+ * long threshold = 10L;
+ * ThresholdCircuitBreaker breaker = new ThresholdCircuitBreaker(10L);
+ * ...
+ * public void handleRequest(Request request) {
+ *     long memoryUsed = estimateMemoryUsage(request);
+ *     if (breaker.incrementAndCheckState(memoryUsed)) {
+ *         // actually handle this request
+ *     } else {
+ *         // do something else, e.g. send an error code
+ *     }
+ * }
+ * 
+ * + *

#Thread safe#

+ */ +public class ThresholdCircuitBreaker extends AbstractCircuitBreaker { + /** + * The initial value of the internal counter. + */ + private final static long INITIAL_COUNT = 0L; + + /** + * The threshold. + */ + private final long threshold; + + /** + * Controls the amount used. + */ + private final AtomicLong used; + + /** + *

Creates a new instance of {@code ThresholdCircuitBreaker} and initializes the threshold.

+ * + * @param threshold the threshold. + */ + public ThresholdCircuitBreaker(long threshold) { + super(); + this.used = new AtomicLong(INITIAL_COUNT); + this.threshold = threshold; + } + + /** + * Gets the threshold. + * + * @return the threshold + */ + public long getThreshold() { + return threshold; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean checkState() throws CircuitBreakingException { + return isOpen(); + } + + /** + * {@inheritDoc} + * + *

Resets the internal counter back to its initial value (zero).

+ */ + @Override + public void close() { + super.close(); + this.used.set(INITIAL_COUNT); + } + + /** + * {@inheritDoc} + * + *

If the threshold is zero, the circuit breaker will be in a permanent open state.

+ */ + @Override + public boolean incrementAndCheckState(Long increment) throws CircuitBreakingException { + if (threshold == 0) { + open(); + } + + long used = this.used.addAndGet(increment); + if (used > threshold) { + open(); + } + + return checkState(); + } + +} diff --git a/src/test/java/org/apache/commons/lang3/concurrent/EventCountCircuitBreakerTest.java b/src/test/java/org/apache/commons/lang3/concurrent/EventCountCircuitBreakerTest.java new file mode 100644 index 000000000..8b685e928 --- /dev/null +++ b/src/test/java/org/apache/commons/lang3/concurrent/EventCountCircuitBreakerTest.java @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.lang3.concurrent; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import java.beans.PropertyChangeEvent; +import java.beans.PropertyChangeListener; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +/** + * Test class for {@code EventCountCircuitBreaker}. + */ +public class EventCountCircuitBreakerTest { + /** Constant for the opening threshold. */ + private static final int OPENING_THRESHOLD = 10; + + /** Constant for the closing threshold. */ + private static final int CLOSING_THRESHOLD = 5; + + /** Constant for the factor for converting nanoseconds. */ + private static final long NANO_FACTOR = 1000L * 1000L * 1000L; + + /** + * Tests that time units are correctly taken into account by constructors. + */ + @Test + public void testIntervalCalculation() { + EventCountCircuitBreaker breaker = new EventCountCircuitBreaker(OPENING_THRESHOLD, 1, + TimeUnit.SECONDS, CLOSING_THRESHOLD, 2, TimeUnit.MILLISECONDS); + assertEquals("Wrong opening interval", NANO_FACTOR, breaker.getOpeningInterval()); + assertEquals("Wrong closing interval", 2 * NANO_FACTOR / 1000, + breaker.getClosingInterval()); + } + + /** + * Tests that the closing interval is the same as the opening interval if it is not + * specified. + */ + @Test + public void testDefaultClosingInterval() { + EventCountCircuitBreaker breaker = new EventCountCircuitBreaker(OPENING_THRESHOLD, 1, + TimeUnit.SECONDS, CLOSING_THRESHOLD); + assertEquals("Wrong closing interval", NANO_FACTOR, breaker.getClosingInterval()); + } + + /** + * Tests that the closing threshold is the same as the opening threshold if not + * specified otherwise. + */ + @Test + public void testDefaultClosingThreshold() { + EventCountCircuitBreaker breaker = new EventCountCircuitBreaker(OPENING_THRESHOLD, 1, + TimeUnit.SECONDS); + assertEquals("Wrong closing interval", NANO_FACTOR, breaker.getClosingInterval()); + assertEquals("Wrong closing threshold", OPENING_THRESHOLD, + breaker.getClosingThreshold()); + } + + /** + * Tests that a circuit breaker is closed after its creation. + */ + @Test + public void testInitiallyClosed() { + EventCountCircuitBreaker breaker = new EventCountCircuitBreaker(OPENING_THRESHOLD, 1, + TimeUnit.SECONDS); + assertFalse("Open", breaker.isOpen()); + assertTrue("Not closed", breaker.isClosed()); + } + + /** + * Tests whether the current time is correctly determined. + */ + @Test + public void testNow() { + EventCountCircuitBreaker breaker = new EventCountCircuitBreaker(OPENING_THRESHOLD, 1, + TimeUnit.SECONDS); + long now = breaker.now(); + long delta = Math.abs(System.nanoTime() - now); + assertTrue("Delta to current time too large", delta < 100000); + } + + /** + * Tests that the circuit breaker stays closed if the number of received events stays + * below the threshold. + */ + @Test + public void testNotOpeningUnderThreshold() { + long startTime = 1000; + EventCountCircuitBreakerTestImpl breaker = new EventCountCircuitBreakerTestImpl(OPENING_THRESHOLD, 1, + TimeUnit.SECONDS, CLOSING_THRESHOLD, 1, TimeUnit.SECONDS); + for (int i = 0; i < OPENING_THRESHOLD - 1; i++) { + assertTrue("In open state", breaker.at(startTime).incrementAndCheckState()); + startTime++; + } + assertTrue("Not closed", breaker.isClosed()); + } + + /** + * Tests that the circuit breaker stays closed if there are a number of received + * events, but not in a single check interval. + */ + @Test + public void testNotOpeningCheckIntervalExceeded() { + long startTime = 0L; + long timeIncrement = 3 * NANO_FACTOR / (2 * OPENING_THRESHOLD); + EventCountCircuitBreakerTestImpl breaker = new EventCountCircuitBreakerTestImpl(OPENING_THRESHOLD, 1, + TimeUnit.SECONDS, CLOSING_THRESHOLD, 1, TimeUnit.SECONDS); + for (int i = 0; i < 5 * OPENING_THRESHOLD; i++) { + assertTrue("In open state", breaker.at(startTime).incrementAndCheckState()); + startTime += timeIncrement; + } + assertTrue("Not closed", breaker.isClosed()); + } + + /** + * Tests that the circuit breaker opens if all conditions are met. + */ + @Test + public void testOpeningWhenThresholdReached() { + long startTime = 0; + long timeIncrement = NANO_FACTOR / OPENING_THRESHOLD - 1; + EventCountCircuitBreakerTestImpl breaker = new EventCountCircuitBreakerTestImpl(OPENING_THRESHOLD, 1, + TimeUnit.SECONDS, CLOSING_THRESHOLD, 1, TimeUnit.SECONDS); + boolean open = false; + for (int i = 0; i < OPENING_THRESHOLD + 1; i++) { + open = !breaker.at(startTime).incrementAndCheckState(); + startTime += timeIncrement; + } + assertTrue("Not open", open); + assertFalse("Closed", breaker.isClosed()); + } + + /** + * Tests that an open circuit breaker does not close itself when the number of events + * received is over the threshold. + */ + @Test + public void testNotClosingOverThreshold() { + EventCountCircuitBreakerTestImpl breaker = new EventCountCircuitBreakerTestImpl(OPENING_THRESHOLD, + 10, TimeUnit.SECONDS, CLOSING_THRESHOLD, 1, TimeUnit.SECONDS); + long startTime = 0; + breaker.open(); + for (int i = 0; i <= CLOSING_THRESHOLD; i++) { + assertFalse("Not open", breaker.at(startTime).incrementAndCheckState()); + startTime += 1000; + } + assertFalse("Closed in new interval", breaker.at(startTime + NANO_FACTOR) + .incrementAndCheckState()); + assertTrue("Not open at end", breaker.isOpen()); + } + + /** + * Tests that the circuit breaker closes automatically if the number of events + * received goes under the closing threshold. + */ + @Test + public void testClosingWhenThresholdReached() { + EventCountCircuitBreakerTestImpl breaker = new EventCountCircuitBreakerTestImpl(OPENING_THRESHOLD, + 10, TimeUnit.SECONDS, CLOSING_THRESHOLD, 1, TimeUnit.SECONDS); + breaker.open(); + breaker.at(1000).incrementAndCheckState(); + assertFalse("Already closed", breaker.at(2000).checkState()); + assertFalse("Closed at interval end", breaker.at(NANO_FACTOR).checkState()); + assertTrue("Not closed after interval end", breaker.at(NANO_FACTOR + 1) + .checkState()); + assertTrue("Not closed at end", breaker.isClosed()); + } + + /** + * Tests whether an explicit open operation fully initializes the internal check data + * object. Otherwise, the circuit breaker may close itself directly afterwards. + */ + @Test + public void testOpenStartsNewCheckInterval() { + EventCountCircuitBreakerTestImpl breaker = new EventCountCircuitBreakerTestImpl(OPENING_THRESHOLD, 2, + TimeUnit.SECONDS, CLOSING_THRESHOLD, 1, TimeUnit.SECONDS); + breaker.at(NANO_FACTOR - 1000).open(); + assertTrue("Not open", breaker.isOpen()); + assertFalse("Already closed", breaker.at(NANO_FACTOR + 100).checkState()); + } + + /** + * Tests whether a new check interval is started if the circuit breaker has a + * transition to open state. + */ + @Test + public void testAutomaticOpenStartsNewCheckInterval() { + EventCountCircuitBreakerTestImpl breaker = new EventCountCircuitBreakerTestImpl(OPENING_THRESHOLD, 2, + TimeUnit.SECONDS, CLOSING_THRESHOLD, 1, TimeUnit.SECONDS); + long time = 10 * NANO_FACTOR; + for (int i = 0; i <= OPENING_THRESHOLD; i++) { + breaker.at(time++).incrementAndCheckState(); + } + assertTrue("Not open", breaker.isOpen()); + time += NANO_FACTOR - 1000; + assertFalse("Already closed", breaker.at(time).incrementAndCheckState()); + time += 1001; + assertTrue("Not closed in time interval", breaker.at(time).checkState()); + } + + /** + * Tests whether the circuit breaker can be closed explicitly. + */ + @Test + public void testClose() { + EventCountCircuitBreakerTestImpl breaker = new EventCountCircuitBreakerTestImpl(OPENING_THRESHOLD, 2, + TimeUnit.SECONDS, CLOSING_THRESHOLD, 1, TimeUnit.SECONDS); + long time = 0; + for (int i = 0; i <= OPENING_THRESHOLD; i++, time += 1000) { + breaker.at(time).incrementAndCheckState(); + } + assertTrue("Not open", breaker.isOpen()); + breaker.close(); + assertTrue("Not closed", breaker.isClosed()); + assertTrue("Open again", breaker.at(time + 1000).incrementAndCheckState()); + } + + /** + * Tests whether events are generated when the state is changed. + */ + @Test + public void testChangeEvents() { + EventCountCircuitBreaker breaker = new EventCountCircuitBreaker(OPENING_THRESHOLD, 1, + TimeUnit.SECONDS); + ChangeListener listener = new ChangeListener(breaker); + breaker.addChangeListener(listener); + breaker.open(); + breaker.close(); + listener.verify(Boolean.TRUE, Boolean.FALSE); + } + + /** + * Tests whether a change listener can be removed. + */ + @Test + public void testRemoveChangeListener() { + EventCountCircuitBreaker breaker = new EventCountCircuitBreaker(OPENING_THRESHOLD, 1, + TimeUnit.SECONDS); + ChangeListener listener = new ChangeListener(breaker); + breaker.addChangeListener(listener); + breaker.open(); + breaker.removeChangeListener(listener); + breaker.close(); + listener.verify(Boolean.TRUE); + } + + /** + * Tests that a state transition triggered by multiple threads is handled correctly. + * Only the first transition should cause an event to be sent. + */ + @Test + public void testStateTransitionGuarded() throws InterruptedException { + final EventCountCircuitBreaker breaker = new EventCountCircuitBreaker(OPENING_THRESHOLD, 1, + TimeUnit.SECONDS); + ChangeListener listener = new ChangeListener(breaker); + breaker.addChangeListener(listener); + + final int threadCount = 128; + final CountDownLatch latch = new CountDownLatch(1); + Thread[] threads = new Thread[threadCount]; + for (int i = 0; i < threadCount; i++) { + threads[i] = new Thread() { + @Override + public void run() { + try { + latch.await(); + } catch (InterruptedException iex) { + // ignore + } + breaker.open(); + } + }; + threads[i].start(); + } + latch.countDown(); + for (Thread thread : threads) { + thread.join(); + } + listener.verify(Boolean.TRUE); + } + + /** + * Tests that automatic state transitions generate change events as well. + */ + @Test + public void testChangeEventsGeneratedByAutomaticTransitions() { + EventCountCircuitBreakerTestImpl breaker = new EventCountCircuitBreakerTestImpl(OPENING_THRESHOLD, 2, + TimeUnit.SECONDS, CLOSING_THRESHOLD, 1, TimeUnit.SECONDS); + ChangeListener listener = new ChangeListener(breaker); + breaker.addChangeListener(listener); + long time = 0; + for (int i = 0; i <= OPENING_THRESHOLD; i++, time += 1000) { + breaker.at(time).incrementAndCheckState(); + } + breaker.at(NANO_FACTOR + 1).checkState(); + breaker.at(3 * NANO_FACTOR).checkState(); + listener.verify(Boolean.TRUE, Boolean.FALSE); + } + + /** + * A test implementation of {@code EventCountCircuitBreaker} which supports mocking the timer. + * This is useful for the creation of deterministic tests for switching the circuit + * breaker's state. + */ + private static class EventCountCircuitBreakerTestImpl extends EventCountCircuitBreaker { + /** The current time in nanoseconds. */ + private long currentTime; + + public EventCountCircuitBreakerTestImpl(int openingThreshold, long openingInterval, + TimeUnit openingUnit, int closingThreshold, long closingInterval, + TimeUnit closingUnit) { + super(openingThreshold, openingInterval, openingUnit, closingThreshold, + closingInterval, closingUnit); + } + + /** + * Sets the current time to be used by this test object for the next operation. + * + * @param time the time to set + * @return a reference to this object + */ + public EventCountCircuitBreakerTestImpl at(long time) { + currentTime = time; + return this; + } + + /** + * {@inheritDoc} This implementation returns the value passed to the {@code at()} + * method. + */ + @Override + long now() { + return currentTime; + } + } + + /** + * A test change listener for checking whether correct change events are generated. + */ + private static class ChangeListener implements PropertyChangeListener { + /** The expected event source. */ + private final Object expectedSource; + + /** A list with the updated values extracted from received change events. */ + private final List changedValues; + + /** + * Creates a new instance of {@code ChangeListener} and sets the expected event + * source. + * + * @param source the expected event source + */ + public ChangeListener(Object source) { + expectedSource = source; + changedValues = new ArrayList(); + } + + public void propertyChange(PropertyChangeEvent evt) { + assertEquals("Wrong event source", expectedSource, evt.getSource()); + assertEquals("Wrong property name", "open", evt.getPropertyName()); + Boolean newValue = (Boolean) evt.getNewValue(); + Boolean oldValue = (Boolean) evt.getOldValue(); + assertNotEquals("Old and new value are equal", newValue, oldValue); + changedValues.add(newValue); + } + + /** + * Verifies that change events for the expected values have been received. + * + * @param values the expected values + */ + public void verify(Boolean... values) { + assertArrayEquals(values, + changedValues.toArray(new Boolean[changedValues.size()])); + } + } +} \ No newline at end of file diff --git a/src/test/java/org/apache/commons/lang3/concurrent/ThresholdCircuitBreakerTest.java b/src/test/java/org/apache/commons/lang3/concurrent/ThresholdCircuitBreakerTest.java new file mode 100644 index 000000000..70f22cd22 --- /dev/null +++ b/src/test/java/org/apache/commons/lang3/concurrent/ThresholdCircuitBreakerTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.lang3.concurrent; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +/** + * Test class for {@code ThresholdCircuitBreaker}. + */ +public class ThresholdCircuitBreakerTest { + + /** + * Threshold used in tests. + */ + private static final long threshold = 10L; + + private static final long zeroThreshold = 0L; + + /** + * Tests that the threshold is working as expected when incremented and no exception is thrown. + */ + @Test + public void testThreshold() { + ThresholdCircuitBreaker circuit = new ThresholdCircuitBreaker(threshold); + circuit.incrementAndCheckState(9L); + assertFalse("Circuit opened before reaching the threshold", circuit.incrementAndCheckState(1L)); + } + + /** + * Tests that exceeding the threshold raises an exception. + */ + @Test + public void testThresholdCircuitBreakingException() { + ThresholdCircuitBreaker circuit = new ThresholdCircuitBreaker(threshold); + circuit.incrementAndCheckState(9L); + assertTrue("The circuit was spposed to be open after increment above the threshold", circuit.incrementAndCheckState(2L)); + } + + /** + * Test that when threshold is zero, the circuit breaker is always open. + */ + @Test + public void testThresholdEqualsZero() { + ThresholdCircuitBreaker circuit = new ThresholdCircuitBreaker(zeroThreshold); + assertTrue("When the threshold is zero, the circuit is supposed to be always open", circuit.incrementAndCheckState(0L)); + } + + /** + * Tests that closing a {@code ThresholdCircuitBreaker} resets the internal counter. + */ + @Test + public void testClosingThresholdCircuitBreaker() { + ThresholdCircuitBreaker circuit = new ThresholdCircuitBreaker(threshold); + circuit.incrementAndCheckState(9L); + circuit.close(); + // now the internal counter is back at zero, not 9 anymore. So it is safe to increment 9 again + assertFalse("Internal counter was not reset back to zero", circuit.incrementAndCheckState(9L)); + } + + /** + * Tests that we can get the threshold value correctly. + */ + @Test + public void testGettingThreshold() { + ThresholdCircuitBreaker circuit = new ThresholdCircuitBreaker(threshold); + assertEquals("Wrong value of threshold", Long.valueOf(threshold), Long.valueOf(circuit.getThreshold())); + } + +}