mirror of https://github.com/apache/nifi.git
NIFI-9371 Removed synchronized keyword from Active Threads methods (#5513)
- ConcurrentHashMap collection for Active Threads eliminates the need for method synchronization - Replaced System.out.println() with Logger.info() in nifi-system-test-suite classes
This commit is contained in:
parent
119ba17bf1
commit
5aced2b4bc
|
@ -1515,18 +1515,18 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void activateThread() {
|
private void activateThread() {
|
||||||
final Thread thread = Thread.currentThread();
|
final Thread thread = Thread.currentThread();
|
||||||
final Long timestamp = System.currentTimeMillis();
|
final Long timestamp = System.currentTimeMillis();
|
||||||
activeThreads.put(thread, new ActiveTask(timestamp));
|
activeThreads.put(thread, new ActiveTask(timestamp));
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void deactivateThread() {
|
private void deactivateThread() {
|
||||||
activeThreads.remove(Thread.currentThread());
|
activeThreads.remove(Thread.currentThread());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized List<ActiveThreadInfo> getActiveThreads(final ThreadDetails threadDetails) {
|
public List<ActiveThreadInfo> getActiveThreads(final ThreadDetails threadDetails) {
|
||||||
final long now = System.currentTimeMillis();
|
final long now = System.currentTimeMillis();
|
||||||
|
|
||||||
final Map<Long, ThreadInfo> threadInfoMap = Stream.of(threadDetails.getThreadInfos())
|
final Map<Long, ThreadInfo> threadInfoMap = Stream.of(threadDetails.getThreadInfos())
|
||||||
|
@ -1550,7 +1550,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized int getTerminatedThreadCount() {
|
public int getTerminatedThreadCount() {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
for (final ActiveTask task : activeThreads.values()) {
|
for (final ActiveTask task : activeThreads.values()) {
|
||||||
if (task.isTerminated()) {
|
if (task.isTerminated()) {
|
||||||
|
|
|
@ -525,13 +525,13 @@ public class NiFiClientUtil {
|
||||||
while (true) {
|
while (true) {
|
||||||
final List<ControllerServiceEntity> nonDisabledServices = getControllerServicesNotInState(groupId, desiredState, serviceIdsOfInterest);
|
final List<ControllerServiceEntity> nonDisabledServices = getControllerServicesNotInState(groupId, desiredState, serviceIdsOfInterest);
|
||||||
if (nonDisabledServices.isEmpty()) {
|
if (nonDisabledServices.isEmpty()) {
|
||||||
System.out.println(String.format("All Controller Services in Process Group %s now have desired state of %s", groupId, desiredState));
|
logger.info("Process Group [{}] Controller Services have desired state [{}]", groupId, desiredState);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final ControllerServiceEntity entity = nonDisabledServices.get(0);
|
final ControllerServiceEntity entity = nonDisabledServices.get(0);
|
||||||
System.out.println(String.format("Controller Service with ID %s and type %s has a State of %s while waiting for state of %s; will wait 500 millis and check again", entity.getId(),
|
logger.info("Controller Service ID [{}] Type [{}] State [{}] waiting for State [{}]: sleeping for 500 ms before retrying", entity.getId(),
|
||||||
entity.getComponent().getType(), entity.getComponent().getState(), desiredState));
|
entity.getComponent().getType(), entity.getComponent().getState(), desiredState);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Thread.sleep(500L);
|
Thread.sleep(500L);
|
||||||
|
|
|
@ -22,6 +22,8 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
|
||||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientConfig;
|
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientConfig;
|
||||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.impl.JerseyNiFiClient;
|
import org.apache.nifi.toolkit.cli.impl.client.nifi.impl.JerseyNiFiClient;
|
||||||
import org.apache.nifi.util.file.FileUtils;
|
import org.apache.nifi.util.file.FileUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
|
@ -38,6 +40,8 @@ import java.util.Properties;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
public class SpawnedStandaloneNiFiInstanceFactory implements NiFiInstanceFactory {
|
public class SpawnedStandaloneNiFiInstanceFactory implements NiFiInstanceFactory {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(SpawnedStandaloneNiFiInstanceFactory.class);
|
||||||
|
|
||||||
private final InstanceConfiguration instanceConfig;
|
private final InstanceConfiguration instanceConfig;
|
||||||
|
|
||||||
public SpawnedStandaloneNiFiInstanceFactory(final InstanceConfiguration instanceConfig) {
|
public SpawnedStandaloneNiFiInstanceFactory(final InstanceConfiguration instanceConfig) {
|
||||||
|
@ -88,7 +92,7 @@ public class SpawnedStandaloneNiFiInstanceFactory implements NiFiInstanceFactory
|
||||||
throw new IllegalStateException("NiFi has already been started");
|
throw new IllegalStateException("NiFi has already been started");
|
||||||
}
|
}
|
||||||
|
|
||||||
System.out.println("Starting instance " + instanceDirectory.getName());
|
logger.info("Starting NiFi [{}]", instanceDirectory.getName());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.runNiFi = new RunNiFi(bootstrapConfigFile);
|
this.runNiFi = new RunNiFi(bootstrapConfigFile);
|
||||||
|
@ -108,7 +112,7 @@ public class SpawnedStandaloneNiFiInstanceFactory implements NiFiInstanceFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
public void createEnvironment() throws IOException {
|
public void createEnvironment() throws IOException {
|
||||||
System.out.println("Creating environment for instance " + instanceDirectory.getName());
|
logger.info("Creating environment for NiFi [{}]", instanceDirectory.getName());
|
||||||
|
|
||||||
cleanup();
|
cleanup();
|
||||||
|
|
||||||
|
@ -193,7 +197,7 @@ public class SpawnedStandaloneNiFiInstanceFactory implements NiFiInstanceFactory
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
client.getFlowClient().getRootGroupId();
|
client.getFlowClient().getRootGroupId();
|
||||||
System.out.println("Completed startup of instance " + instanceDirectory.getName());
|
logger.info("Startup Completed NiFi [{}]", instanceDirectory.getName());
|
||||||
return;
|
return;
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
try {
|
try {
|
||||||
|
@ -212,11 +216,11 @@ public class SpawnedStandaloneNiFiInstanceFactory implements NiFiInstanceFactory
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
System.out.println("Stopping instance " + instanceDirectory.getName());
|
logger.info("Shutdown Started NiFi [{}]", instanceDirectory.getName());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
runNiFi.stop();
|
runNiFi.stop();
|
||||||
System.out.println("Completed shutdown of instance " + instanceDirectory.getName());
|
logger.info("Shutdown Completed NiFi [{}]", instanceDirectory.getName());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException("Failed to stop NiFi", e);
|
throw new RuntimeException("Failed to stop NiFi", e);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
Loading…
Reference in New Issue