NIFI-259: Bug fixes

This commit is contained in:
Mark Payne 2016-01-21 13:44:44 -05:00
parent 8f9c0b9ca3
commit b07e13a1d8
4 changed files with 53 additions and 39 deletions

View File

@ -421,40 +421,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
componentStatusSnapshotMillis = snapshotMillis;
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
readLock.lock();
try {
for (final Node node : nodes) {
if (Status.CONNECTED.equals(node.getStatus())) {
ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
if (statusRepository == null) {
statusRepository = createComponentStatusRepository();
componentMetricsRepositoryMap.put(node.getNodeId(), statusRepository);
}
// ensure this node has a payload
if (node.getHeartbeat() != null && node.getHeartbeatPayload() != null) {
// if nothing has been captured or the current heartbeat is newer, capture it - comparing the heatbeat created timestamp
// is safe since its marked as XmlTransient so we're assured that its based off the same clock that created the last capture date
if (statusRepository.getLastCaptureDate() == null || node.getHeartbeat().getCreatedTimestamp() > statusRepository.getLastCaptureDate().getTime()) {
statusRepository.capture(node.getHeartbeatPayload().getProcessGroupStatus());
}
}
}
}
} catch (final Throwable t) {
logger.warn("Unable to capture component metrics from Node heartbeats: " + t);
if (logger.isDebugEnabled()) {
logger.warn("", t);
}
} finally {
readLock.unlock("capture component metrics from node heartbeats");
}
}
}, componentStatusSnapshotMillis, componentStatusSnapshotMillis, TimeUnit.MILLISECONDS);
remoteInputPort = properties.getRemoteInputPort();
if (remoteInputPort == null) {
remoteSiteListener = null;
@ -496,6 +462,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, new QuartzSchedulingAgent(null, reportingTaskEngine, null, encryptor));
processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 10);
processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10);
processScheduler.scheduleFrameworkTask(new CaptureComponentMetrics(), "Capture Component Metrics", componentStatusSnapshotMillis, componentStatusSnapshotMillis, TimeUnit.MILLISECONDS);
controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider);
}
@ -4595,4 +4562,41 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
return controllerServiceProvider.getControllerServiceIdentifiers(serviceType);
}
/**
* Captures snapshots of components' metrics
*/
private class CaptureComponentMetrics implements Runnable {
@Override
public void run() {
readLock.lock();
try {
for (final Node node : nodes) {
if (Status.CONNECTED.equals(node.getStatus())) {
ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
if (statusRepository == null) {
statusRepository = createComponentStatusRepository();
componentMetricsRepositoryMap.put(node.getNodeId(), statusRepository);
}
// ensure this node has a payload
if (node.getHeartbeat() != null && node.getHeartbeatPayload() != null) {
// if nothing has been captured or the current heartbeat is newer, capture it - comparing the heatbeat created timestamp
// is safe since its marked as XmlTransient so we're assured that its based off the same clock that created the last capture date
if (statusRepository.getLastCaptureDate() == null || node.getHeartbeat().getCreatedTimestamp() > statusRepository.getLastCaptureDate().getTime()) {
statusRepository.capture(node.getHeartbeatPayload().getProcessGroupStatus());
}
}
}
}
} catch (final Throwable t) {
logger.warn("Unable to capture component metrics from Node heartbeats: " + t);
if (logger.isDebugEnabled()) {
logger.warn("", t);
}
} finally {
readLock.unlock("capture component metrics from node heartbeats");
}
}
}
}

View File

@ -114,7 +114,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
private List<ACL> acl;
public ZooKeeperStateProvider() throws Exception {
public ZooKeeperStateProvider() {
}
@ -232,6 +232,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
if (Code.SESSIONEXPIRED == ke.code()) {
invalidateClient();
onComponentRemoved(componentId);
return;
}
throw new IOException("Unable to remove state for component with ID '" + componentId + "' from ZooKeeper", ke);
@ -309,6 +310,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
} catch (final KeeperException ke) {
if (ke.code() == Code.NONODE) {
createNode(path, data);
return;
} else {
throw ke;
}
@ -320,9 +322,11 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
if (Code.SESSIONEXPIRED == ke.code()) {
invalidateClient();
setState(stateValues, version, componentId);
return;
}
if (Code.NODEEXISTS == ke.code()) {
setState(stateValues, version, componentId);
return;
}
throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId, ke);
@ -347,16 +351,19 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
if (Code.SESSIONEXPIRED == ke.code()) {
invalidateClient();
createNode(path, data);
return;
}
// Node already exists. Node must have been created by "someone else". Just set the data.
if (ke.code() == Code.NODEEXISTS) {
try {
getZooKeeper().setData(path, data, -1);
return;
} catch (final KeeperException ke1) {
// Node no longer exists -- it was removed by someone else. Go recreate the node.
if (ke1.code() == Code.NONODE) {
createNode(path, data);
return;
}
} catch (final InterruptedException ie) {
throw new IOException("Failed to update cluster-wide state due to interruption", ie);

View File

@ -69,8 +69,7 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider {
this.provider = createProvider(properties);
}
private ZooKeeperStateProvider createProvider(final Map<PropertyDescriptor, String> properties) throws Exception {
final ZooKeeperStateProvider provider = new ZooKeeperStateProvider();
private void initializeProvider(final ZooKeeperStateProvider provider, final Map<PropertyDescriptor, String> properties) throws IOException {
provider.initialize(new StateProviderInitializationContext() {
@Override
public String getIdentifier() {
@ -97,7 +96,11 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider {
return null;
}
});
}
private ZooKeeperStateProvider createProvider(final Map<PropertyDescriptor, String> properties) throws Exception {
final ZooKeeperStateProvider provider = new ZooKeeperStateProvider();
initializeProvider(provider, properties);
provider.enable();
return provider;
}

View File

@ -41,7 +41,7 @@ public class StandardComponentStateDAO implements ComponentStateDAO {
return manager.getState(scope);
} catch (final IOException ioe) {
throw new IllegalStateException(String.format("Unable to get the state for the specified component %s: %s", componentId, ioe));
throw new IllegalStateException(String.format("Unable to get the state for the specified component %s: %s", componentId, ioe), ioe);
}
}
@ -56,7 +56,7 @@ public class StandardComponentStateDAO implements ComponentStateDAO {
manager.clear(Scope.CLUSTER);
manager.clear(Scope.LOCAL);
} catch (final IOException ioe) {
throw new IllegalStateException(String.format("Unable to clear the state for the specified component %s: %s", componentId, ioe));
throw new IllegalStateException(String.format("Unable to clear the state for the specified component %s: %s", componentId, ioe), ioe);
}
}