Skip to content

Commit

Permalink
Added comments to clarify the logic of the response callback
Browse files Browse the repository at this point in the history
  • Loading branch information
nicusX committed Oct 6, 2024
1 parent ed0ce66 commit 1ff0b53
Showing 1 changed file with 37 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.connector.prometheus.sink;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.prometheus.sink.PrometheusSinkConfiguration.OnErrorBehavior;
import org.apache.flink.connector.prometheus.sink.errorhandling.PrometheusSinkWriteException;
import org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseType;
import org.apache.flink.connector.prometheus.sink.metrics.SinkMetricsCallback;
Expand Down Expand Up @@ -56,6 +57,11 @@
* called with an outcome of *completed* either when the request has succeeded or the max retry
* limit has been exceeded. It is responsibility of the callback distinguishing between these
* conditions.
*
* <p>Also, when an exception is thrown after the request is *completed*, for the http client point
* of view (i.e. in the {@link #completed(SimpleHttpResponse)} callback method), it does not
* directly cause the writer to fail until it is intercepted further up the client stack, by the
* {@link org.apache.flink.connector.prometheus.sink.http.RethrowingIOSessionListener}.
*/
@Internal
class HttpResponseCallback implements FutureCallback<SimpleHttpResponse> {
Expand Down Expand Up @@ -83,12 +89,13 @@ public HttpResponseCallback(
}

/**
* The completed outcome is invoked every time the http client successfully receives a valid
* http response, regardless of the status code.
* The completed outcome is invoked when the http client successfully receives a response,
* regardless of the status code.
*
* <p>This method classifies the responses and implements the behaviour expected by the
* Remote-Write specifications. In case of error, the behaviour is determined by the error
* handling configuration.
* <p>This method classifies the responses using {@link
* org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier} and implements
* the behaviour expected by the Remote-Write specifications. If the response is classified as
* an error, the behaviour is determined by the error handling configuration.
*/
@Override
public void completed(SimpleHttpResponse response) {
Expand All @@ -97,19 +104,19 @@ public void completed(SimpleHttpResponse response) {

RemoteWriteResponseType responseType = classify(response);
switch (responseType) {
case SUCCESS:
// Response is success
case SUCCESS: // Data successfully written
// Increment successful writes counts
metricsCallback.onSuccessfulWriteRequest(sampleCount);
LOG.debug(
"{},{} - successfully posted {} time-series, containing {} samples",
response.getCode(),
response.getReasonPhrase(),
timeSeriesCount,
sampleCount);
metricsCallback.onSuccessfulWriteRequest(sampleCount);
break;

case FATAL_ERROR:
// Response is a fatal error: throw an exception regardless of configured behaviour
case FATAL_ERROR: // Response is a fatal error
// Throw an exception regardless of configured behaviour
logErrorAndThrow(
new PrometheusSinkWriteException(
"Fatal error response from Prometheus",
Expand All @@ -120,11 +127,10 @@ public void completed(SimpleHttpResponse response) {
response.getBodyText()));
break;

case NON_RETRIABLE_ERROR:
// Response is a non-retriable error.
// Depending on the configured behaviour, log and discard or throw an exception
case NON_RETRIABLE_ERROR: // Response is a non-retriable error.
// If behavior is FAIL, throw an exception
if (errorHandlingBehaviorConfig.getOnPrometheusNonRetriableError()
== PrometheusSinkConfiguration.OnErrorBehavior.FAIL) {
== OnErrorBehavior.FAIL) {
logErrorAndThrow(
new PrometheusSinkWriteException(
"Non-retriable error response from Prometheus",
Expand All @@ -135,21 +141,20 @@ public void completed(SimpleHttpResponse response) {
response.getBodyText()));
}

// Otherwise (DISCARD_AND_CONTINUE), increment discarded data counts & log WARN
metricsCallback.onFailedWriteRequestForNonRetriableError(sampleCount);
LOG.warn(
"{},{} {} (discarded {} time-series, containing {} samples)",
response.getCode(),
response.getReasonPhrase(),
response.getBodyText(),
timeSeriesCount,
sampleCount);
metricsCallback.onFailedWriteRequestForNonRetriableError(sampleCount);
break;

case RETRIABLE_ERROR:
// Retry limit exceeded on retriable error
// Depending on the configured behaviour, log and discard or throw an exception
if (errorHandlingBehaviorConfig.getOnMaxRetryExceeded()
== PrometheusSinkConfiguration.OnErrorBehavior.FAIL) {
case RETRIABLE_ERROR: // Retry limit exceeded on retriable error
// If behavior is FAIL, throw an exception
if (errorHandlingBehaviorConfig.getOnMaxRetryExceeded() == OnErrorBehavior.FAIL) {
logErrorAndThrow(
new PrometheusSinkWriteException(
"Max retry limit exceeded on retriable error",
Expand All @@ -160,18 +165,19 @@ public void completed(SimpleHttpResponse response) {
response.getBodyText()));
}

// Otherwise (DISCARD_AND_CONTINUE), increment discarded data counts & log WARN
metricsCallback.onFailedWriteRequestForRetryLimitExceeded(sampleCount);
LOG.warn(
"{},{} {} (after retry limit reached, discarded {} time-series, containing {} samples)",
response.getCode(),
response.getReasonPhrase(),
response.getBodyText(),
timeSeriesCount,
sampleCount);
metricsCallback.onFailedWriteRequestForRetryLimitExceeded(sampleCount);
break;

default:
// Unexpected/unhandled response outcome: fail
default: // Unexpected/unhandled response outcome
// Always fail
logErrorAndThrow(
new PrometheusSinkWriteException(
"Unexpected status code returned from the remote-write endpoint",
Expand All @@ -183,23 +189,24 @@ public void completed(SimpleHttpResponse response) {
}
}

/**
* Exception reported by the http client (e.g. I/O failure). Always throw an exception.
*
* @param ex exception reported by the http client
*/
@Override
public void failed(Exception ex) {
// General I/O failure reported by http client
// Always fail
throw new PrometheusSinkWriteException("Http client failure", ex);
}

/** The async http client was cancelled. Always throw an exception. */
@Override
public void cancelled() {
// When the async http client is cancelled, the sink always throws an exception
// When the async http client is cancelled, the sink should always throw an exception
throw new PrometheusSinkWriteException("Write request execution cancelled");
}

/**
* Log the exception at ERROR and rethrow. It will be intercepted up the client stack, by the
* {@link org.apache.flink.connector.prometheus.sink.http.RethrowingIOSessionListener}.
*/
/** Log the exception at ERROR and rethrow. */
private void logErrorAndThrow(PrometheusSinkWriteException ex) {
LOG.error("Error condition detected by the http response callback (on complete)", ex);
throw ex;
Expand Down

0 comments on commit 1ff0b53

Please sign in to comment.