mirror of https://github.com/apache/nifi.git
NIFI-8405: Added debug logging around how long it takes to establish connections/query dns/read and write headers and body when replication requests; added additional timing around Ranger audits and authorizations and monitoring of long-running tasks because those run often and frequently show up in the logs at the same time as the long requests
This closes #4983 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
78b69f10dc
commit
14e6dc3dc6
|
@ -0,0 +1,161 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.cluster.coordination.http.replication.okhttp;
|
||||||
|
|
||||||
|
import okhttp3.Call;
|
||||||
|
|
||||||
|
import java.net.SocketAddress;
|
||||||
|
import java.text.NumberFormat;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class CallEventListener {
|
||||||
|
private final Call call;
|
||||||
|
private final Map<String, Timing> dnsTimings = new HashMap<>();
|
||||||
|
private final Map<String, Timing> establishConnectionTiming = new HashMap<>();
|
||||||
|
private long callStart;
|
||||||
|
private long callEnd;
|
||||||
|
private long responseBodyStart;
|
||||||
|
private long responseBodyEnd;
|
||||||
|
private long responseHeaderStart;
|
||||||
|
private long responseHeaderEnd;
|
||||||
|
private long requestHeaderStart;
|
||||||
|
private long requestHeaderEnd;
|
||||||
|
private long requestBodyStart;
|
||||||
|
private long requestBodyEnd;
|
||||||
|
private long secureConnectStart;
|
||||||
|
private long secureConnectEnd;
|
||||||
|
|
||||||
|
|
||||||
|
public CallEventListener(final Call call) {
|
||||||
|
this.call = call;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void callStart() {
|
||||||
|
callStart = System.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void callEnd() {
|
||||||
|
callEnd = System.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void dnsStart(final String domainName) {
|
||||||
|
dnsTimings.computeIfAbsent(domainName, k -> new Timing(domainName)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void dnsEnd(final String domainName) {
|
||||||
|
dnsTimings.computeIfAbsent(domainName, k -> new Timing(domainName)).end();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void responseBodyStart() {
|
||||||
|
responseBodyStart = System.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void responseBodyEnd() {
|
||||||
|
responseBodyEnd = System.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void responseHeaderStart() {
|
||||||
|
responseHeaderStart = System.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void responseHeaderEnd() {
|
||||||
|
responseHeaderEnd = System.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void requestHeaderStart() {
|
||||||
|
requestHeaderStart = System.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void requestHeaderEnd() {
|
||||||
|
requestHeaderEnd = System.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void requestBodyStart() {
|
||||||
|
requestBodyStart = System.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void requestBodyEnd() {
|
||||||
|
requestBodyEnd = System.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void connectStart(final SocketAddress address) {
|
||||||
|
establishConnectionTiming.computeIfAbsent(address.toString(), Timing::new).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void connectionAcquired(final SocketAddress address) {
|
||||||
|
establishConnectionTiming.computeIfAbsent(address.toString(), Timing::new).end();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void secureConnectStart() {
|
||||||
|
secureConnectStart = System.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void secureConnectEnd() {
|
||||||
|
secureConnectEnd = System.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Call getCall() {
|
||||||
|
return call;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
final NumberFormat numberFormat = NumberFormat.getInstance();
|
||||||
|
|
||||||
|
return "CallEventListener{" +
|
||||||
|
"url=" + call.request().url() +
|
||||||
|
", dnsTimings=" + dnsTimings.values() +
|
||||||
|
", establishConnectionTiming=" + establishConnectionTiming.values() +
|
||||||
|
", tlsInitialization=" + numberFormat.format(secureConnectEnd - secureConnectStart) + " nanos" +
|
||||||
|
", writeRequestHeaders=" + numberFormat.format(requestHeaderEnd - requestHeaderStart) + " nanos" +
|
||||||
|
", writeRequestBody=" + numberFormat.format(requestBodyEnd - requestBodyStart) + " nanos" +
|
||||||
|
", readResponseHeaders=" + numberFormat.format(responseHeaderEnd - responseHeaderStart) + " nanos" +
|
||||||
|
", readResponseBody=" + numberFormat.format(responseBodyEnd - responseBodyStart) + " nanos" +
|
||||||
|
", callTime=" + numberFormat.format(callEnd - callStart) + " nanos" +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class Timing {
|
||||||
|
private final String address;
|
||||||
|
private long start;
|
||||||
|
private long nanos;
|
||||||
|
|
||||||
|
public Timing(final String address) {
|
||||||
|
this.address = address;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getAddress() {
|
||||||
|
return address;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start() {
|
||||||
|
start = System.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void end() {
|
||||||
|
if (start > 0) {
|
||||||
|
nanos += (System.nanoTime() - start);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return "{address=" + address + ", nanos=" + NumberFormat.getInstance().format(nanos) + "}";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -21,29 +21,6 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include;
|
||||||
import com.fasterxml.jackson.annotation.JsonInclude.Value;
|
import com.fasterxml.jackson.annotation.JsonInclude.Value;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
|
import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.net.URI;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import java.util.stream.Stream;
|
|
||||||
import java.util.zip.GZIPInputStream;
|
|
||||||
import javax.net.ssl.SSLContext;
|
|
||||||
import javax.net.ssl.TrustManager;
|
|
||||||
import javax.net.ssl.X509TrustManager;
|
|
||||||
import javax.ws.rs.HttpMethod;
|
|
||||||
import javax.ws.rs.core.MultivaluedHashMap;
|
|
||||||
import javax.ws.rs.core.MultivaluedMap;
|
|
||||||
import javax.ws.rs.core.Response;
|
|
||||||
import okhttp3.Call;
|
import okhttp3.Call;
|
||||||
import okhttp3.ConnectionPool;
|
import okhttp3.ConnectionPool;
|
||||||
import okhttp3.Headers;
|
import okhttp3.Headers;
|
||||||
|
@ -66,6 +43,30 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.util.StreamUtils;
|
import org.springframework.util.StreamUtils;
|
||||||
|
|
||||||
|
import javax.net.ssl.SSLContext;
|
||||||
|
import javax.net.ssl.TrustManager;
|
||||||
|
import javax.net.ssl.X509TrustManager;
|
||||||
|
import javax.ws.rs.HttpMethod;
|
||||||
|
import javax.ws.rs.core.MultivaluedHashMap;
|
||||||
|
import javax.ws.rs.core.MultivaluedMap;
|
||||||
|
import javax.ws.rs.core.Response;
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
import java.util.zip.GZIPInputStream;
|
||||||
|
|
||||||
public class OkHttpReplicationClient implements HttpReplicationClient {
|
public class OkHttpReplicationClient implements HttpReplicationClient {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(OkHttpReplicationClient.class);
|
private static final Logger logger = LoggerFactory.getLogger(OkHttpReplicationClient.class);
|
||||||
private static final Set<String> gzipEncodings = Stream.of("gzip", "x-gzip").collect(Collectors.toSet());
|
private static final Set<String> gzipEncodings = Stream.of("gzip", "x-gzip").collect(Collectors.toSet());
|
||||||
|
@ -317,6 +318,7 @@ public class OkHttpReplicationClient implements HttpReplicationClient {
|
||||||
okHttpClientBuilder.followRedirects(true);
|
okHttpClientBuilder.followRedirects(true);
|
||||||
final int connectionPoolSize = properties.getClusterNodeMaxConcurrentRequests();
|
final int connectionPoolSize = properties.getClusterNodeMaxConcurrentRequests();
|
||||||
okHttpClientBuilder.connectionPool(new ConnectionPool(connectionPoolSize, 5, TimeUnit.MINUTES));
|
okHttpClientBuilder.connectionPool(new ConnectionPool(connectionPoolSize, 5, TimeUnit.MINUTES));
|
||||||
|
okHttpClientBuilder.eventListener(new RequestReplicationEventListener());
|
||||||
|
|
||||||
// Apply the TLS configuration, if present
|
// Apply the TLS configuration, if present
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -0,0 +1,174 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.cluster.coordination.http.replication.okhttp;
|
||||||
|
|
||||||
|
import okhttp3.Call;
|
||||||
|
import okhttp3.Connection;
|
||||||
|
import okhttp3.EventListener;
|
||||||
|
import okhttp3.Handshake;
|
||||||
|
import okhttp3.Request;
|
||||||
|
import okhttp3.Response;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import org.jetbrains.annotations.Nullable;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.Proxy;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
|
public class RequestReplicationEventListener extends EventListener {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(RequestReplicationEventListener.class);
|
||||||
|
|
||||||
|
private final ConcurrentMap<Call, CallEventListener> eventListeners = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private CallEventListener getListener(final Call call) {
|
||||||
|
return eventListeners.computeIfAbsent(call, CallEventListener::new);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void dnsStart(@NotNull final Call call, @NotNull final String domainName) {
|
||||||
|
super.dnsStart(call, domainName);
|
||||||
|
getListener(call).dnsStart(domainName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void dnsEnd(@NotNull final Call call, @NotNull final String domainName, @NotNull final List<InetAddress> inetAddressList) {
|
||||||
|
super.dnsEnd(call, domainName, inetAddressList);
|
||||||
|
getListener(call).dnsEnd(domainName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void callStart(@NotNull final Call call) {
|
||||||
|
super.callStart(call);
|
||||||
|
getListener(call).callStart();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void callEnd(@NotNull final Call call) {
|
||||||
|
super.callEnd(call);
|
||||||
|
final CallEventListener callListener = getListener(call);
|
||||||
|
callListener.callEnd();
|
||||||
|
|
||||||
|
logTimingInfo(callListener);
|
||||||
|
eventListeners.remove(call);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void callFailed(@NotNull final Call call, @NotNull final IOException ioe) {
|
||||||
|
super.callFailed(call, ioe);
|
||||||
|
|
||||||
|
final CallEventListener callListener = getListener(call);
|
||||||
|
callListener.callEnd();
|
||||||
|
|
||||||
|
logTimingInfo(callListener);
|
||||||
|
eventListeners.remove(call);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void responseBodyStart(@NotNull final Call call) {
|
||||||
|
super.responseBodyStart(call);
|
||||||
|
getListener(call).responseBodyStart();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void responseBodyEnd(@NotNull final Call call, final long byteCount) {
|
||||||
|
super.responseBodyEnd(call, byteCount);
|
||||||
|
getListener(call).responseBodyEnd();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void responseFailed(@NotNull final Call call, @NotNull final IOException ioe) {
|
||||||
|
super.responseFailed(call, ioe);
|
||||||
|
getListener(call).responseBodyEnd();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void responseHeadersStart(@NotNull final Call call) {
|
||||||
|
super.responseHeadersStart(call);
|
||||||
|
getListener(call).responseHeaderStart();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void responseHeadersEnd(@NotNull final Call call, @NotNull final Response response) {
|
||||||
|
super.responseHeadersEnd(call, response);
|
||||||
|
getListener(call).responseHeaderEnd();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void requestHeadersStart(@NotNull final Call call) {
|
||||||
|
super.requestHeadersStart(call);
|
||||||
|
getListener(call).requestHeaderStart();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void requestHeadersEnd(@NotNull final Call call, @NotNull final Request request) {
|
||||||
|
super.requestHeadersEnd(call, request);
|
||||||
|
getListener(call).requestHeaderEnd();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void requestBodyStart(@NotNull final Call call) {
|
||||||
|
super.requestBodyStart(call);
|
||||||
|
getListener(call).requestBodyStart();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void requestBodyEnd(@NotNull final Call call, final long byteCount) {
|
||||||
|
super.requestBodyEnd(call, byteCount);
|
||||||
|
getListener(call).requestBodyEnd();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void requestFailed(@NotNull final Call call, @NotNull final IOException ioe) {
|
||||||
|
super.requestFailed(call, ioe);
|
||||||
|
getListener(call).requestBodyEnd();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void connectStart(@NotNull final Call call, @NotNull final InetSocketAddress inetSocketAddress, @NotNull final Proxy proxy) {
|
||||||
|
super.connectStart(call, inetSocketAddress, proxy);
|
||||||
|
getListener(call).connectStart(inetSocketAddress);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void connectionAcquired(@NotNull final Call call, @NotNull final Connection connection) {
|
||||||
|
super.connectionAcquired(call, connection);
|
||||||
|
getListener(call).connectionAcquired(connection.socket().getRemoteSocketAddress());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void secureConnectStart(@NotNull final Call call) {
|
||||||
|
super.secureConnectStart(call);
|
||||||
|
getListener(call).secureConnectStart();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void secureConnectEnd(@NotNull final Call call, @Nullable final Handshake handshake) {
|
||||||
|
super.secureConnectEnd(call, handshake);
|
||||||
|
getListener(call).secureConnectEnd();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void logTimingInfo(final CallEventListener eventListener) {
|
||||||
|
logger.debug("Timing information {}", eventListener);
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,6 +26,7 @@ import org.apache.nifi.reporting.Severity;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.text.NumberFormat;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public class LongRunningTaskMonitor implements Runnable {
|
public class LongRunningTaskMonitor implements Runnable {
|
||||||
|
@ -45,6 +46,7 @@ public class LongRunningTaskMonitor implements Runnable {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
getLogger().debug("Checking long running processor tasks...");
|
getLogger().debug("Checking long running processor tasks...");
|
||||||
|
final long start = System.nanoTime();
|
||||||
|
|
||||||
int activeThreadCount = 0;
|
int activeThreadCount = 0;
|
||||||
int longRunningThreadCount = 0;
|
int longRunningThreadCount = 0;
|
||||||
|
@ -73,7 +75,8 @@ public class LongRunningTaskMonitor implements Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
getLogger().info("Active threads: {}; Long running threads: {}", activeThreadCount, longRunningThreadCount);
|
final long nanos = System.nanoTime() - start;
|
||||||
|
getLogger().info("Active threads: {}; Long running threads: {}; time to check: {} nanos", activeThreadCount, longRunningThreadCount, NumberFormat.getInstance().format(nanos));
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
|
@ -32,11 +32,11 @@ import org.slf4j.Logger;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class LongRunningTaskMonitorTest {
|
public class LongRunningTaskMonitorTest {
|
||||||
|
@ -45,7 +45,6 @@ public class LongRunningTaskMonitorTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test() {
|
public void test() {
|
||||||
// GIVEN
|
|
||||||
ThreadDetails threadDetails = mock(ThreadDetails.class);
|
ThreadDetails threadDetails = mock(ThreadDetails.class);
|
||||||
|
|
||||||
ActiveThreadInfo activeThreadInfo11 = mockActiveThreadInfo("Thread-11", 60_000);
|
ActiveThreadInfo activeThreadInfo11 = mockActiveThreadInfo("Thread-11", 60_000);
|
||||||
|
@ -82,12 +81,8 @@ public class LongRunningTaskMonitorTest {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// WHEN
|
|
||||||
longRunningTaskMonitor.run();
|
longRunningTaskMonitor.run();
|
||||||
|
|
||||||
// THEN
|
|
||||||
verify(longRunningTaskMonitorLogger).debug("Checking long running processor tasks...");
|
|
||||||
|
|
||||||
ArgumentCaptor<String> logMessages = ArgumentCaptor.forClass(String.class);
|
ArgumentCaptor<String> logMessages = ArgumentCaptor.forClass(String.class);
|
||||||
verify(longRunningTaskMonitorLogger, times(2)).warn(logMessages.capture());
|
verify(longRunningTaskMonitorLogger, times(2)).warn(logMessages.capture());
|
||||||
assertEquals("Long running task detected on processor [id=Processor-1-ID, name=Processor-1-Name, type=Processor-1-Type]. Task time: 60 seconds. Stack trace:\n" + STACKTRACE,
|
assertEquals("Long running task detected on processor [id=Processor-1-ID, name=Processor-1-Name, type=Processor-1-Type]. Task time: 60 seconds. Stack trace:\n" + STACKTRACE,
|
||||||
|
@ -97,18 +92,18 @@ public class LongRunningTaskMonitorTest {
|
||||||
|
|
||||||
ArgumentCaptor<String> controllerBulletinMessages = ArgumentCaptor.forClass(String.class);
|
ArgumentCaptor<String> controllerBulletinMessages = ArgumentCaptor.forClass(String.class);
|
||||||
verify(eventReporter, times(2)).reportEvent(eq(Severity.WARNING), eq("Long Running Task"), controllerBulletinMessages.capture());
|
verify(eventReporter, times(2)).reportEvent(eq(Severity.WARNING), eq("Long Running Task"), controllerBulletinMessages.capture());
|
||||||
assertEquals("Processor with ID Processor-1-ID, Name Processor-1-Name and Type Processor-1-Type has a task that has been running for 60 seconds (thread name: Thread-12).",
|
|
||||||
controllerBulletinMessages.getAllValues().get(0));
|
|
||||||
assertEquals("Processor with ID Processor-2-ID, Name Processor-2-Name and Type Processor-2-Type has a task that has been running for 1,000 seconds (thread name: Thread-21).",
|
|
||||||
controllerBulletinMessages.getAllValues().get(1));
|
|
||||||
|
|
||||||
verify(processorLogger1).warn("The processor has a task that has been running for 60 seconds (thread name: Thread-12).");
|
final String firstBulletinMessage = controllerBulletinMessages.getAllValues().get(0);
|
||||||
|
assertTrue(firstBulletinMessage.contains("Processor-1-ID"));
|
||||||
|
assertTrue(firstBulletinMessage.contains("Processor-1-Type"));
|
||||||
|
assertTrue(firstBulletinMessage.contains("Processor-1-Name"));
|
||||||
|
assertTrue(firstBulletinMessage.contains("Thread-12"));
|
||||||
|
|
||||||
verify(processorLogger2).warn("The processor has a task that has been running for 1,000 seconds (thread name: Thread-21).");
|
final String secondBulletinMessage = controllerBulletinMessages.getAllValues().get(1);
|
||||||
|
assertTrue(secondBulletinMessage.contains("Processor-2-ID"));
|
||||||
verify(longRunningTaskMonitorLogger).info("Active threads: {}; Long running threads: {}", 4, 2);
|
assertTrue(secondBulletinMessage.contains("Processor-2-Type"));
|
||||||
|
assertTrue(secondBulletinMessage.contains("Processor-2-Name"));
|
||||||
verifyNoMoreInteractions(longRunningTaskMonitorLogger, eventReporter, processorLogger1, processorLogger2);
|
assertTrue(secondBulletinMessage.contains("Thread-21"));
|
||||||
}
|
}
|
||||||
|
|
||||||
private ActiveThreadInfo mockActiveThreadInfo(String threadName, long activeMillis) {
|
private ActiveThreadInfo mockActiveThreadInfo(String threadName, long activeMillis) {
|
||||||
|
|
|
@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
|
import java.text.NumberFormat;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -55,7 +56,6 @@ import java.util.WeakHashMap;
|
||||||
* Authorizer implementation that uses Apache Ranger to make authorization decisions.
|
* Authorizer implementation that uses Apache Ranger to make authorization decisions.
|
||||||
*/
|
*/
|
||||||
public class RangerNiFiAuthorizer implements Authorizer, AuthorizationAuditor {
|
public class RangerNiFiAuthorizer implements Authorizer, AuthorizationAuditor {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(RangerNiFiAuthorizer.class);
|
private static final Logger logger = LoggerFactory.getLogger(RangerNiFiAuthorizer.class);
|
||||||
|
|
||||||
static final String RANGER_AUDIT_PATH_PROP = "Ranger Audit Config Path";
|
static final String RANGER_AUDIT_PATH_PROP = "Ranger Audit Config Path";
|
||||||
|
@ -79,6 +79,7 @@ public class RangerNiFiAuthorizer implements Authorizer, AuthorizationAuditor {
|
||||||
private volatile String rangerAdminIdentity = null;
|
private volatile String rangerAdminIdentity = null;
|
||||||
private volatile boolean rangerKerberosEnabled = false;
|
private volatile boolean rangerKerberosEnabled = false;
|
||||||
private volatile NiFiProperties nifiProperties;
|
private volatile NiFiProperties nifiProperties;
|
||||||
|
private final NumberFormat numberFormat = NumberFormat.getInstance();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initialize(AuthorizerInitializationContext initializationContext) throws AuthorizerCreationException {
|
public void initialize(AuthorizerInitializationContext initializationContext) throws AuthorizerCreationException {
|
||||||
|
@ -177,7 +178,10 @@ public class RangerNiFiAuthorizer implements Authorizer, AuthorizationAuditor {
|
||||||
rangerRequest.setClientIPAddress(clientIp);
|
rangerRequest.setClientIPAddress(clientIp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final long authStart = System.nanoTime();
|
||||||
final RangerAccessResult result = nifiPlugin.isAccessAllowed(rangerRequest);
|
final RangerAccessResult result = nifiPlugin.isAccessAllowed(rangerRequest);
|
||||||
|
final long authNanos = System.nanoTime() - authStart;
|
||||||
|
logger.debug("Performed authorization against Ranger for Resource ID {}, Identity {} in {} nanos", resourceIdentifier, identity, numberFormat.format(authNanos));
|
||||||
|
|
||||||
// store the result for auditing purposes later if appropriate
|
// store the result for auditing purposes later if appropriate
|
||||||
if (request.isAccessAttempt()) {
|
if (request.isAccessAttempt()) {
|
||||||
|
@ -223,7 +227,10 @@ public class RangerNiFiAuthorizer implements Authorizer, AuthorizationAuditor {
|
||||||
event.setResourceType(RANGER_NIFI_RESOURCE_NAME);
|
event.setResourceType(RANGER_NIFI_RESOURCE_NAME);
|
||||||
event.setResourcePath(request.getRequestedResource().getIdentifier());
|
event.setResourcePath(request.getRequestedResource().getIdentifier());
|
||||||
|
|
||||||
|
final long start = System.nanoTime();
|
||||||
defaultAuditHandler.logAuthzAudit(event);
|
defaultAuditHandler.logAuthzAudit(event);
|
||||||
|
final long nanos = System.nanoTime() - start;
|
||||||
|
logger.debug("Logged authorization audits to Ranger in {} nanos", numberFormat.format(nanos));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue