diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java index 740ac3ac1d..480dc11982 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java @@ -16,15 +16,6 @@ */ package org.apache.nifi.remote.client; -import org.apache.nifi.remote.TransferDirection; -import org.apache.nifi.remote.protocol.http.HttpProxy; -import org.apache.nifi.remote.util.SiteToSiteRestApiClient; -import org.apache.nifi.web.api.dto.ControllerDTO; -import org.apache.nifi.web.api.dto.PortDTO; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.net.ssl.SSLContext; import java.io.IOException; import java.net.URI; import java.util.HashMap; @@ -34,9 +25,16 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -public class SiteInfoProvider { +import javax.net.ssl.SSLContext; - private static final Logger logger = LoggerFactory.getLogger(SiteInfoProvider.class); +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.protocol.http.HttpProxy; +import org.apache.nifi.remote.util.SiteToSiteRestApiClient; +import org.apache.nifi.web.api.dto.ControllerDTO; +import org.apache.nifi.web.api.dto.PortDTO; + +public class SiteInfoProvider { private static final long REMOTE_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES); @@ -60,7 +58,7 @@ public class SiteInfoProvider { private ControllerDTO refreshRemoteInfo() throws IOException { final ControllerDTO controller; - try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, proxy)) { + try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, proxy, EventReporter.NO_OP)) { apiClient.resolveBaseUrl(clusterUrl); apiClient.setConnectTimeoutMillis(connectTimeoutMillis); apiClient.setReadTimeoutMillis(readTimeoutMillis); diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java index 32d11419f8..9c1efae0cb 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java @@ -157,7 +157,7 @@ public interface SiteToSiteClient extends Closeable { private String truststoreFilename; private String truststorePass; private KeystoreType truststoreType; - private EventReporter eventReporter; + private EventReporter eventReporter = EventReporter.NO_OP; private File peerPersistenceFile; private boolean useCompression; private String portName; diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java index 8821edb604..75bd3a6841 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java @@ -105,7 +105,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr @Override public Set fetchRemotePeerStatuses(PeerDescription peerDescription) throws IOException { // Each node should has the same URL structure and network reach-ability with the proxy configuration. - try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy())) { + try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy(), config.getEventReporter())) { final String scheme = peerDescription.isSecure() ? "https" : "http"; final String clusterApiUrl = apiClient.resolveBaseUrl(scheme, peerDescription.getHostname(), peerDescription.getPort()); @@ -148,7 +148,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr } } - final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy()); + final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy(), config.getEventReporter()); apiClient.setBaseUrl(peer.getUrl()); apiClient.setConnectTimeoutMillis(timeoutMillis); @@ -192,7 +192,12 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr } }; - transaction.initialize(apiClient, transactionUrl); + try { + transaction.initialize(apiClient, transactionUrl); + } catch (final Exception e) { + transaction.error(); + throw e; + } activeTransactions.add(transaction); return transaction; diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java index 862f2cdf01..c7249b16f6 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java @@ -55,6 +55,7 @@ import org.apache.http.nio.protocol.HttpAsyncRequestProducer; import org.apache.http.protocol.HttpContext; import org.apache.http.protocol.HttpCoreContext; import org.apache.http.util.EntityUtils; +import org.apache.nifi.events.EventReporter; import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator; @@ -68,6 +69,7 @@ import org.apache.nifi.remote.protocol.CommunicationsSession; import org.apache.nifi.remote.protocol.ResponseCode; import org.apache.nifi.remote.protocol.http.HttpHeaders; import org.apache.nifi.remote.protocol.http.HttpProxy; +import org.apache.nifi.reporting.Severity; import org.apache.nifi.security.util.CertificateUtils; import org.apache.nifi.stream.io.ByteArrayInputStream; import org.apache.nifi.stream.io.ByteArrayOutputStream; @@ -128,6 +130,9 @@ import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTE public class SiteToSiteRestApiClient implements Closeable { + private static final String EVENT_CATEGORY = "Site-to-Site"; + private static final int DATA_PACKET_CHANNEL_READ_BUFFER_SIZE = 16384; + private static final int RESPONSE_CODE_OK = 200; private static final int RESPONSE_CODE_CREATED = 201; private static final int RESPONSE_CODE_ACCEPTED = 202; @@ -140,6 +145,7 @@ public class SiteToSiteRestApiClient implements Closeable { protected final SSLContext sslContext; protected final HttpProxy proxy; private final AtomicBoolean proxyAuthRequiresResend = new AtomicBoolean(false); + private final EventReporter eventReporter; private RequestConfig requestConfig; private CredentialsProvider credentialsProvider; @@ -156,16 +162,21 @@ public class SiteToSiteRestApiClient implements Closeable { private String trustedPeerDn; private final ScheduledExecutorService ttlExtendTaskExecutor; - private ScheduledFuture ttlExtendingThread; + private ScheduledFuture ttlExtendingFuture; private SiteToSiteRestApiClient extendingApiClient; private int connectTimeoutMillis; private int readTimeoutMillis; private static final Pattern HTTP_ABS_URL = Pattern.compile("^https?://.+$"); - public SiteToSiteRestApiClient(final SSLContext sslContext, final HttpProxy proxy) { + private Future postResult; + private CountDownLatch transferDataLatch = new CountDownLatch(1); + + + public SiteToSiteRestApiClient(final SSLContext sslContext, final HttpProxy proxy, final EventReporter eventReporter) { this.sslContext = sslContext; this.proxy = proxy; + this.eventReporter = eventReporter; ttlExtendTaskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() { private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); @@ -295,6 +306,7 @@ public class SiteToSiteRestApiClient implements Closeable { } catch (final CertificateException e) { final String msg = "Could not extract subject DN from SSL session peer certificate"; logger.warn(msg); + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, msg); throw new SSLPeerUnverifiedException(msg); } } @@ -379,6 +391,7 @@ public class SiteToSiteRestApiClient implements Closeable { throw handleErrResponse(responseCode, content); } } + logger.debug("initiateTransaction handshaking finished, transactionUrl={}", transactionUrl); return transactionUrl; @@ -487,7 +500,9 @@ public class SiteToSiteRestApiClient implements Closeable { @Override public void failed(Exception ex) { - logger.error("Create transaction for {} has failed", post.getURI(), ex); + final String msg = String.format("Failed to create transactino for %s", post.getURI()); + logger.error(msg, ex); + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, msg); } @Override @@ -620,9 +635,6 @@ public class SiteToSiteRestApiClient implements Closeable { } } - private final int DATA_PACKET_CHANNEL_READ_BUFFER_SIZE = 16384; - private Future postResult; - private CountDownLatch transferDataLatch = new CountDownLatch(1); public void openConnectionForSend(final String transactionUrl, final Peer peer) throws IOException { @@ -717,6 +729,7 @@ public class SiteToSiteRestApiClient implements Closeable { final long totalWritten = commSession.getOutput().getBytesWritten(); logger.debug("sending data to {} has reached to its end. produced {} bytes by reading {} bytes from channel. {} bytes written in this transaction.", flowFilesPath, totalProduced, totalRead, totalWritten); + if (totalRead != totalWritten || totalProduced != totalWritten) { final String msg = "Sending data to %s has reached to its end, but produced : read : wrote byte sizes (%d : %d : %d) were not equal. Something went wrong."; throw new RuntimeException(String.format(msg, flowFilesPath, totalProduced, totalRead, totalWritten)); @@ -751,7 +764,9 @@ public class SiteToSiteRestApiClient implements Closeable { @Override public void failed(final Exception ex) { - logger.error("Sending data to {} has failed", flowFilesPath, ex); + final String msg = String.format("Failed to send data to %s due to %s", flowFilesPath, ex.toString()); + logger.error(msg, ex); + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, msg); } @Override @@ -842,21 +857,25 @@ public class SiteToSiteRestApiClient implements Closeable { } private void startExtendingTtl(final String transactionUrl, final Closeable stream, final CloseableHttpResponse response) { - if (ttlExtendingThread != null) { + if (ttlExtendingFuture != null) { // Already started. return; } + logger.debug("Starting extending TTL thread..."); - extendingApiClient = new SiteToSiteRestApiClient(sslContext, proxy); + + extendingApiClient = new SiteToSiteRestApiClient(sslContext, proxy, EventReporter.NO_OP); extendingApiClient.transportProtocolVersionNegotiator = this.transportProtocolVersionNegotiator; extendingApiClient.connectTimeoutMillis = this.connectTimeoutMillis; extendingApiClient.readTimeoutMillis = this.readTimeoutMillis; final int extendFrequency = serverTransactionTtl / 2; - ttlExtendingThread = ttlExtendTaskExecutor.scheduleWithFixedDelay(() -> { + + ttlExtendingFuture = ttlExtendTaskExecutor.scheduleWithFixedDelay(() -> { try { extendingApiClient.extendTransaction(transactionUrl); } catch (final Exception e) { logger.warn("Failed to extend transaction ttl", e); + try { // Without disconnecting, Site-to-Site client keep reading data packet, // while server has already rollback. @@ -874,7 +893,7 @@ public class SiteToSiteRestApiClient implements Closeable { closeable.close(); } } catch (final IOException e) { - logger.warn("Got an exception during closing {}: {}", closeable, e.getMessage()); + logger.warn("Got an exception when closing {}: {}", closeable, e.getMessage()); if (logger.isDebugEnabled()) { logger.warn("", e); } @@ -912,9 +931,9 @@ public class SiteToSiteRestApiClient implements Closeable { ttlExtendTaskExecutor.shutdown(); } - if (ttlExtendingThread != null && !ttlExtendingThread.isCancelled()) { + if (ttlExtendingFuture != null && !ttlExtendingFuture.isCancelled()) { logger.debug("Cancelling extending ttl..."); - ttlExtendingThread.cancel(true); + ttlExtendingFuture.cancel(true); } closeSilently(extendingApiClient); @@ -1099,6 +1118,7 @@ public class SiteToSiteRestApiClient implements Closeable { final ObjectMapper mapper = new ObjectMapper(); mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); + try { return mapper.readValue(responseMessage, entityClass); } catch (JsonParseException e) { diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestSiteToSiteRestApiClient.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestSiteToSiteRestApiClient.java index 1ae964cc7e..c0e2cf3442 100644 --- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestSiteToSiteRestApiClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestSiteToSiteRestApiClient.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.remote.util; +import org.apache.nifi.events.EventReporter; import org.junit.Assert; import org.junit.Test; @@ -24,7 +25,7 @@ public class TestSiteToSiteRestApiClient { @Test public void testResolveBaseUrlHttp() throws Exception{ - final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null); + final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP); final String baseUrl = apiClient.resolveBaseUrl("http://nifi.example.com/nifi"); Assert.assertEquals("http://nifi.example.com/nifi-api", baseUrl); @@ -33,7 +34,7 @@ public class TestSiteToSiteRestApiClient { @Test public void testResolveBaseUrlHttpSub() throws Exception{ - final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null); + final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP); final String baseUrl = apiClient.resolveBaseUrl("http://nifi.example.com/foo/bar/baz/nifi"); Assert.assertEquals("http://nifi.example.com/foo/bar/baz/nifi-api", baseUrl); @@ -41,7 +42,7 @@ public class TestSiteToSiteRestApiClient { @Test public void testResolveBaseUrlHttpPort() { - final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null); + final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP); final String baseUrl = apiClient.resolveBaseUrl("http://nifi.example.com:8080/nifi"); Assert.assertEquals("http://nifi.example.com:8080/nifi-api", baseUrl); @@ -50,7 +51,7 @@ public class TestSiteToSiteRestApiClient { @Test public void testResolveBaseUrlHttps() throws Exception{ - final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null); + final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP); final String baseUrl = apiClient.resolveBaseUrl("https://nifi.example.com/nifi"); Assert.assertEquals("https://nifi.example.com/nifi-api", baseUrl); @@ -58,7 +59,7 @@ public class TestSiteToSiteRestApiClient { @Test public void testResolveBaseUrlHttpsPort() { - final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null); + final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP); final String baseUrl = apiClient.resolveBaseUrl("https://nifi.example.com:8443/nifi"); Assert.assertEquals("https://nifi.example.com:8443/nifi-api", baseUrl); diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/events/EventReporter.java b/nifi-framework-api/src/main/java/org/apache/nifi/events/EventReporter.java index d645d60d6e..77bddec8b5 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/events/EventReporter.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/events/EventReporter.java @@ -24,6 +24,15 @@ import org.apache.nifi.reporting.Severity; * Implementations MUST be thread-safe */ public interface EventReporter extends Serializable { + /** + * An Event Reporter that performs no action and ignores all given input + */ + public static final EventReporter NO_OP = new EventReporter() { + @Override + public void reportEvent(Severity severity, String category, String message) { + } + }; void reportEvent(Severity severity, String category, String message); + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 989e8dd0b7..31bfd3dfba 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -835,8 +835,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { try { // perform the request final ControllerDTO dto; - try ( - final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()) { + try (final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()) { dto = apiClient.getController(); } catch (IOException e) { writeLock.lock(); @@ -928,7 +927,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { } private SiteToSiteRestApiClient getSiteToSiteRestApiClient() { - SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, new HttpProxy(proxyHost, proxyPort, proxyUser, proxyPassword)); + SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, new HttpProxy(proxyHost, proxyPort, proxyUser, proxyPassword), getEventReporter()); apiClient.setBaseUrl(getApiUri()); apiClient.setConnectTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS)); apiClient.setReadTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS));