分享免费的编程资源和教程

网站首页 > 技术教程 正文

聊聊ribbon的retry ribbit nation

goqiw 2024-09-25 20:12:20 技术教程 26 ℃ 0 评论

本文主要研究一下ribbon的retry

配置

HttpClientRibbonConfiguration

spring-cloud-netflix-ribbon-2.0.0.RELEASE-sources.jar!/org/springframework/cloud/netflix/ribbon/apache/HttpClientRibbonConfiguration.java

@Bean

@ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)

@ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")

public RetryableRibbonLoadBalancingHttpClient retryableRibbonLoadBalancingHttpClient(

IClientConfig config, ServerIntrospector serverIntrospector,

ILoadBalancer loadBalancer, RetryHandler retryHandler,

LoadBalancedRetryFactory loadBalancedRetryFactory, CloseableHttpClient httpClient) {

RetryableRibbonLoadBalancingHttpClient client = new RetryableRibbonLoadBalancingHttpClient(

httpClient, config, serverIntrospector, loadBalancedRetryFactory);

client.setLoadBalancer(loadBalancer);

client.setRetryHandler(retryHandler);

Monitors.registerObject("Client_" + this.name, client);

return client;

}

OkHttpRibbonConfiguration

spring-cloud-netflix-ribbon-2.0.0.RELEASE-sources.jar!/org/springframework/cloud/netflix/ribbon/okhttp/OkHttpRibbonConfiguration.java

@Bean

@ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)

@ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")

public RetryableOkHttpLoadBalancingClient retryableOkHttpLoadBalancingClient(

IClientConfig config,

ServerIntrospector serverIntrospector,

ILoadBalancer loadBalancer,

RetryHandler retryHandler,

LoadBalancedRetryFactory loadBalancedRetryFactory,

OkHttpClient delegate) {

RetryableOkHttpLoadBalancingClient client = new RetryableOkHttpLoadBalancingClient(delegate, config,

serverIntrospector, loadBalancedRetryFactory);

client.setLoadBalancer(loadBalancer);

client.setRetryHandler(retryHandler);

Monitors.registerObject("Client_" + this.name, client);

return client;

}

RetryableRibbonLoadBalancingHttpClient

spring-cloud-netflix-ribbon-2.0.0.RELEASE-sources.jar!/org/springframework/cloud/netflix/ribbon/apache/RetryableRibbonLoadBalancingHttpClient.java

/**

* An Apache HTTP client which leverages Spring Retry to retry failed requests.

* @author Ryan Baxter

* @author Gang Li

*/

public class RetryableRibbonLoadBalancingHttpClient extends RibbonLoadBalancingHttpClient {

private LoadBalancedRetryFactory loadBalancedRetryFactory;

public RetryableRibbonLoadBalancingHttpClient(CloseableHttpClient delegate,

IClientConfig config, ServerIntrospector serverIntrospector,

LoadBalancedRetryFactory loadBalancedRetryFactory) {

super(delegate, config, serverIntrospector);

this.loadBalancedRetryFactory = loadBalancedRetryFactory;

}

@Override

public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception {

final RequestConfig.Builder builder = RequestConfig.custom();

IClientConfig config = configOverride != null ? configOverride : this.config;

RibbonProperties ribbon = RibbonProperties.from(config);

builder.setConnectTimeout(ribbon.connectTimeout(this.connectTimeout));

builder.setSocketTimeout(ribbon.readTimeout(this.readTimeout));

builder.setRedirectsEnabled(ribbon.isFollowRedirects(this.followRedirects));

final RequestConfig requestConfig = builder.build();

final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryFactory.createRetryPolicy(this.getClientName(), this);

RetryCallback<RibbonApacheHttpResponse, Exception> retryCallback = context -> {

//on retries the policy will choose the server and set it in the context

//extract the server and update the request being made

RibbonApacheHttpRequest newRequest = request;

if (context instanceof LoadBalancedRetryContext) {

ServiceInstance service = ((LoadBalancedRetryContext) context).getServiceInstance();

validateServiceInstance(service);

if (service != null) {

//Reconstruct the request URI using the host and port set in the retry context

newRequest = newRequest.withNewUri(UriComponentsBuilder.newInstance().host(service.getHost())

.scheme(service.getUri().getScheme()).userInfo(newRequest.getURI().getUserInfo())

.port(service.getPort()).path(newRequest.getURI().getPath())

.query(newRequest.getURI().getQuery()).fragment(newRequest.getURI().getFragment())

.build().encode().toUri());

}

}

newRequest = getSecureRequest(newRequest, configOverride);

HttpUriRequest httpUriRequest = newRequest.toRequest(requestConfig);

final HttpResponse httpResponse = RetryableRibbonLoadBalancingHttpClient.this.delegate.execute(httpUriRequest);

if (retryPolicy.retryableStatusCode(httpResponse.getStatusLine().getStatusCode())) {

throw new HttpClientStatusCodeException(RetryableRibbonLoadBalancingHttpClient.this.clientName,

httpResponse, HttpClientUtils.createEntity(httpResponse), httpUriRequest.getURI());

}

return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());

};

LoadBalancedRecoveryCallback<RibbonApacheHttpResponse, HttpResponse> recoveryCallback = new LoadBalancedRecoveryCallback<RibbonApacheHttpResponse, HttpResponse>() {

@Override

protected RibbonApacheHttpResponse createResponse(HttpResponse response, URI uri) {

return new RibbonApacheHttpResponse(response, uri);

}

};

return this.executeWithRetry(request, retryPolicy, retryCallback, recoveryCallback);

}

@Override

public boolean isClientRetryable(ContextAwareRequest request) {

return request!= null && isRequestRetryable(request);

}

private boolean isRequestRetryable(ContextAwareRequest request) {

if (request.getContext() == null || request.getContext().getRetryable() == null) {

return true;

}

return request.getContext().getRetryable();

}

private RibbonApacheHttpResponse executeWithRetry(RibbonApacheHttpRequest request, LoadBalancedRetryPolicy retryPolicy,

RetryCallback<RibbonApacheHttpResponse, Exception> callback,

RecoveryCallback<RibbonApacheHttpResponse> recoveryCallback) throws Exception {

RetryTemplate retryTemplate = new RetryTemplate();

boolean retryable = isRequestRetryable(request);

retryTemplate.setRetryPolicy(retryPolicy == null || !retryable ? new NeverRetryPolicy()

: new RetryPolicy(request, retryPolicy, this, this.getClientName()));

BackOffPolicy backOffPolicy = loadBalancedRetryFactory.createBackOffPolicy(this.getClientName());

retryTemplate.setBackOffPolicy(backOffPolicy == null ? new NoBackOffPolicy() : backOffPolicy);

RetryListener[] retryListeners = this.loadBalancedRetryFactory.createRetryListeners(this.getClientName());

if (retryListeners != null && retryListeners.length != 0) {

retryTemplate.setListeners(retryListeners);

}

return retryTemplate.execute(callback, recoveryCallback);

}

@Override

public RequestSpecificRetryHandler getRequestSpecificRetryHandler(RibbonApacheHttpRequest request, IClientConfig requestConfig) {

return new RequestSpecificRetryHandler(false, false, RetryHandler.DEFAULT, null);

}

static class RetryPolicy extends InterceptorRetryPolicy {

public RetryPolicy(HttpRequest request, LoadBalancedRetryPolicy policy,

ServiceInstanceChooser serviceInstanceChooser, String serviceName) {

super(request, policy, serviceInstanceChooser, serviceName);

}

}

}

  • 这里重点看executeWithRetry,通过设置RetryTemplate的RetryPolicy及BackOffPolicy,来执行重试策略

RetryableOkHttpLoadBalancingClient

spring-cloud-netflix-ribbon-2.0.0.RELEASE-sources.jar!/org/springframework/cloud/netflix/ribbon/okhttp/RetryableOkHttpLoadBalancingClient.java

/**

* An OK HTTP client which leverages Spring Retry to retry failed request.

* @author Ryan Baxter

* @author Gang Li

*/

public class RetryableOkHttpLoadBalancingClient extends OkHttpLoadBalancingClient {

private LoadBalancedRetryFactory loadBalancedRetryFactory;

public RetryableOkHttpLoadBalancingClient(OkHttpClient delegate, IClientConfig config, ServerIntrospector serverIntrospector,

LoadBalancedRetryFactory loadBalancedRetryPolicyFactory) {

super(delegate, config, serverIntrospector);

this.loadBalancedRetryFactory = loadBalancedRetryPolicyFactory;

}

@Override

public boolean isClientRetryable(ContextAwareRequest request) {

return request!= null && isRequestRetryable(request);

}

private boolean isRequestRetryable(ContextAwareRequest request) {

if (request.getContext() == null || request.getContext().getRetryable() == null) {

return true;

}

return request.getContext().getRetryable();

}

private OkHttpRibbonResponse executeWithRetry(OkHttpRibbonRequest request, LoadBalancedRetryPolicy retryPolicy,

RetryCallback<OkHttpRibbonResponse, Exception> callback,

RecoveryCallback<OkHttpRibbonResponse> recoveryCallback) throws Exception {

RetryTemplate retryTemplate = new RetryTemplate();

BackOffPolicy backOffPolicy = loadBalancedRetryFactory.createBackOffPolicy(this.getClientName());

retryTemplate.setBackOffPolicy(backOffPolicy == null ? new NoBackOffPolicy() : backOffPolicy);

RetryListener[] retryListeners = this.loadBalancedRetryFactory.createRetryListeners(this.getClientName());

if (retryListeners != null && retryListeners.length != 0) {

retryTemplate.setListeners(retryListeners);

}

boolean retryable = isRequestRetryable(request);

retryTemplate.setRetryPolicy(retryPolicy == null || !retryable ? new NeverRetryPolicy()

: new RetryPolicy(request, retryPolicy, this, this.getClientName()));

return retryTemplate.execute(callback, recoveryCallback);

}

@Override

public OkHttpRibbonResponse execute(final OkHttpRibbonRequest ribbonRequest,

final IClientConfig configOverride) throws Exception {

final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryFactory.createRetryPolicy(this.getClientName(), this);

RetryCallback<OkHttpRibbonResponse, Exception> retryCallback = new RetryCallback<OkHttpRibbonResponse, Exception>() {

@Override

public OkHttpRibbonResponse doWithRetry(RetryContext context) throws Exception {

//on retries the policy will choose the server and set it in the context

//extract the server and update the request being made

OkHttpRibbonRequest newRequest = ribbonRequest;

if(context instanceof LoadBalancedRetryContext) {

ServiceInstance service = ((LoadBalancedRetryContext)context).getServiceInstance();

validateServiceInstance(service);

//Reconstruct the request URI using the host and port set in the retry context

newRequest = newRequest.withNewUri(new URI(service.getUri().getScheme(),

newRequest.getURI().getUserInfo(), service.getHost(), service.getPort(),

newRequest.getURI().getPath(), newRequest.getURI().getQuery(),

newRequest.getURI().getFragment()));

}

if (isSecure(configOverride)) {

final URI secureUri = UriComponentsBuilder.fromUri(newRequest.getUri())

.scheme("https").build().toUri();

newRequest = newRequest.withNewUri(secureUri);

}

OkHttpClient httpClient = getOkHttpClient(configOverride, secure);

final Request request = newRequest.toRequest();

Response response = httpClient.newCall(request).execute();

if(retryPolicy.retryableStatusCode(response.code())) {

ResponseBody responseBody = response.peekBody(Integer.MAX_VALUE);

response.close();

throw new OkHttpStatusCodeException(RetryableOkHttpLoadBalancingClient.this.clientName,

response, responseBody, newRequest.getURI());

}

return new OkHttpRibbonResponse(response, newRequest.getUri());

}

};

return this.executeWithRetry(ribbonRequest, retryPolicy, retryCallback, new LoadBalancedRecoveryCallback<OkHttpRibbonResponse, Response>(){

@Override

protected OkHttpRibbonResponse createResponse(Response response, URI uri) {

return new OkHttpRibbonResponse(response, uri);

}

});

}

@Override

public RequestSpecificRetryHandler getRequestSpecificRetryHandler(OkHttpRibbonRequest request, IClientConfig requestConfig) {

return new RequestSpecificRetryHandler(false, false, RetryHandler.DEFAULT, null);

}

static class RetryPolicy extends InterceptorRetryPolicy {

public RetryPolicy(HttpRequest request, LoadBalancedRetryPolicy policy, ServiceInstanceChooser serviceInstanceChooser, String serviceName) {

super(request, policy, serviceInstanceChooser, serviceName);

}

}

}

  • 这里的executeWithRetry,也是通过设置RetryTemplate的RetryPolicy及BackOffPolicy,来执行重试策略

RetryTemplate

spring-retry-1.2.2.RELEASE-sources.jar!/org/springframework/retry/support/RetryTemplate.java

public class RetryTemplate implements RetryOperations {

/**

* Retry context attribute name that indicates the context should be considered global

* state (never closed). TODO: convert this to a flag in the RetryState.

*/

private static final String GLOBAL_STATE = "state.global";

protected final Log logger = LogFactory.getLog(getClass());

private volatile BackOffPolicy backOffPolicy = new NoBackOffPolicy();

private volatile RetryPolicy retryPolicy = new SimpleRetryPolicy(3);

private volatile RetryListener[] listeners = new RetryListener[0];

private RetryContextCache retryContextCache = new MapRetryContextCache();

private boolean throwLastExceptionOnExhausted;

//......

/**

* Execute the callback once if the policy dictates that we can, otherwise execute the

* recovery callback.

* @param recoveryCallback the {@link RecoveryCallback}

* @param retryCallback the {@link RetryCallback}

* @param state the {@link RetryState}

* @param <T> the type of the return value

* @param <E> the exception type to throw

* @see RetryOperations#execute(RetryCallback, RecoveryCallback, RetryState)

* @throws ExhaustedRetryException if the retry has been exhausted.

* @throws E an exception if the retry operation fails

* @return T the retried value

*/

protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,

RecoveryCallback<T> recoveryCallback, RetryState state)

throws E, ExhaustedRetryException {

RetryPolicy retryPolicy = this.retryPolicy;

BackOffPolicy backOffPolicy = this.backOffPolicy;

// Allow the retry policy to initialise itself...

RetryContext context = open(retryPolicy, state);

if (this.logger.isTraceEnabled()) {

this.logger.trace("RetryContext retrieved: " + context);

}

// Make sure the context is available globally for clients who need

// it...

RetrySynchronizationManager.register(context);

Throwable lastException = null;

boolean exhausted = false;

try {

// Give clients a chance to enhance the context...

boolean running = doOpenInterceptors(retryCallback, context);

if (!running) {

throw new TerminatedRetryException(

"Retry terminated abnormally by interceptor before first attempt");

}

// Get or Start the backoff context...

BackOffContext backOffContext = null;

Object resource = context.getAttribute("backOffContext");

if (resource instanceof BackOffContext) {

backOffContext = (BackOffContext) resource;

}

if (backOffContext == null) {

backOffContext = backOffPolicy.start(context);

if (backOffContext != null) {

context.setAttribute("backOffContext", backOffContext);

}

}

/*

* We allow the whole loop to be skipped if the policy or context already

* forbid the first try. This is used in the case of external retry to allow a

* recovery in handleRetryExhausted without the callback processing (which

* would throw an exception).

*/

while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {

try {

if (this.logger.isDebugEnabled()) {

this.logger.debug("Retry: count=" + context.getRetryCount());

}

// Reset the last exception, so if we are successful

// the close interceptors will not think we failed...

lastException = null;

return retryCallback.doWithRetry(context);

}

catch (Throwable e) {

lastException = e;

try {

registerThrowable(retryPolicy, state, context, e);

}

catch (Exception ex) {

throw new TerminatedRetryException("Could not register throwable",

ex);

}

finally {

doOnErrorInterceptors(retryCallback, context, e);

}

if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {

try {

backOffPolicy.backOff(backOffContext);

}

catch (BackOffInterruptedException ex) {

lastException = e;

// back off was prevented by another thread - fail the retry

if (this.logger.isDebugEnabled()) {

this.logger

.debug("Abort retry because interrupted: count="

+ context.getRetryCount());

}

throw ex;

}

}

if (this.logger.isDebugEnabled()) {

this.logger.debug(

"Checking for rethrow: count=" + context.getRetryCount());

}

if (shouldRethrow(retryPolicy, context, state)) {

if (this.logger.isDebugEnabled()) {

this.logger.debug("Rethrow in retry for policy: count="

+ context.getRetryCount());

}

throw RetryTemplate.<E>wrapIfNecessary(e);

}

}

/*

* A stateful attempt that can retry may rethrow the exception before now,

* but if we get this far in a stateful retry there's a reason for it,

* like a circuit breaker or a rollback classifier.

*/

if (state != null && context.hasAttribute(GLOBAL_STATE)) {

break;

}

}

if (state == null && this.logger.isDebugEnabled()) {

this.logger.debug(

"Retry failed last attempt: count=" + context.getRetryCount());

}

exhausted = true;

return handleRetryExhausted(recoveryCallback, context, state);

}

catch (Throwable e) {

throw RetryTemplate.<E>wrapIfNecessary(e);

}

finally {

close(retryPolicy, context, state, lastException == null || exhausted);

doCloseInterceptors(retryCallback, context, lastException);

RetrySynchronizationManager.clear();

}

}

//......

}

  • 整个retry的逻辑就是通过while控制重试次数,循环内没有返回,到循环外会抛出ExhaustedRetryException
  • BackOffPolicy的backOff方法通过sleeper.sleep执行重试间隔
  • BackOffPolicy有一个子接口SleepingBackOffPolicy,还有一个实现该接口的抽象类StatelessBackOffPolicy
  • SleepingBackOffPolicy的实现类有FixedBackOffPolicy、UniformRandomBackOffPolicy、ExponentialBackOffPolicy、ExponentialRandomBackOffPolicy这三个
  • StatelessBackOffPolicy有FixedBackOffPolicy、NoBackOffPolicy、UniformRandomBackOffPolicy三个子类

小结

ribbon的retry,有两个实现,一个是RetryableRibbonLoadBalancingHttpClient,另外一个是RetryableOkHttpLoadBalancingClient,他们使用的是spring-retry组件的RetryTemplate来实现,该template主要由RetryPolicy以及BackOffPolicy来控制重试。重试大的逻辑就是用while来控制重试次数,然后通过BackOffPolicy来控制重试时间间隔。

doc

  • 9. Retry

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表