我使用Spring Boot并在保持与第三方REST服务的长期连接的同时面临以下问题:
org.springframework.web.client.ResourceAccessException: I/O error on POST request for "http://localhost:5000/products/10": localhost:5000 failed to respond; nested exception is org.apache.http.NoHttpResponseException: localhost:5000 failed to respond at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:732) at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:680) at org.springframework.web.client.RestTemplate.postForObject(RestTemplate.java:435) at com.example.pipeline.domain.service.nlp.NLPService.getDocumentsInfoNew(NLPService.java:42) at com.example.pipeline.domain.batch.steps.NLPTasklet.execute(NLPTasklet.java:170) at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406) at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330) at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:272) at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:81) at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:257) at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:394) at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135) at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:308) at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:141) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.http.NoHttpResponseException: localhost:5000 failed to respond at org.apache.http.impl.conn.DefaultHttpRespons
此服务可以将连接保持一小时或更长时间,然后再返回结果。
我的RestTemplate
配置如下:
public static RestTemplate createRestTemplate(int connectionTimeoutMs, int readTimeoutMs, ObjectMapper objectMapper) { HttpComponentsClientHttpRequestFactory httpRequestFactory = new HttpComponentsClientHttpRequestFactory(HttpClients.createDefault()); httpRequestFactory.setConnectTimeout(connectionTimeoutMs); httpRequestFactory.setReadTimeout(readTimeoutMs); RestTemplate restTemplate = new RestTemplate(httpRequestFactory); Listinterceptors = new ArrayList (); interceptors.add(new LoggingRequestInterceptor()); restTemplate.setInterceptors(interceptors); restTemplate.setRequestFactory(new BufferingClientHttpRequestFactory(httpRequestFactory)); MappingJackson2HttpMessageConverter messageCOnverter= restTemplate.getMessageConverters().stream().filter(MappingJackson2HttpMessageConverter.class::isInstance) .map(MappingJackson2HttpMessageConverter.class::cast).findFirst().orElseThrow(() -> new RuntimeException("MappingJackson2HttpMessageConverter not found")); messageConverter.setObjectMapper(objectMapper); restTemplate.getMessageConverters().stream().filter(StringHttpMessageConverter.class::isInstance).map(StringHttpMessageConverter.class::cast).forEach(a -> { a.setWriteAcceptCharset(false); a.setDefaultCharset(StandardCharsets.UTF_8); }); return restTemplate; } } RestTemplate restTemplate = HttpUtils.createRestTemplate(60 * 1000, 3 * 60 * 60 * 1000, objectMapper);
我可以做些什么来将其固定在我这边,还是在http://localhost:5000
服务方面这是一个纯粹的问题,应该在那儿固定?
最后,我想到了以下RestTempleat配置:
public class HttpUtils { static final Logger LOGGER = LoggerFactory.getLogger(HttpUtils.class); private static final int HTTP_CLIENT_RETRY_COUNT = 3; private static final int MAXIMUM_TOTAL_COnNECTION= 10; private static final int MAXIMUM_CONNECTION_PER_ROUTE = 5; private static final int CONNECTION_VALIDATE_AFTER_INACTIVITY_MS = 10 * 1000; public static RestTemplate createRestTemplate(int connectionTimeoutMs, int readTimeoutMs, ObjectMapper objectMapper) { HttpClientBuilder clientBuilder = HttpClients.custom(); PoolingHttpClientConnectionManager cOnnectionManager= new PoolingHttpClientConnectionManager(); // Set the maximum number of total open connections. connectionManager.setMaxTotal(MAXIMUM_TOTAL_CONNECTION); // Set the maximum number of concurrent connections per route, which is 2 by default. connectionManager.setDefaultMaxPerRoute(MAXIMUM_CONNECTION_PER_ROUTE); connectionManager.setValidateAfterInactivity(CONNECTION_VALIDATE_AFTER_INACTIVITY_MS); clientBuilder.setConnectionManager(connectionManager); clientBuilder.setRetryHandler(new DefaultHttpRequestRetryHandler(HTTP_CLIENT_RETRY_COUNT, true, new ArrayList<>()) { @Override public boolean retryRequest(IOException exception, int executionCount, HttpContext context) { LOGGER.info("Retry request, execution count: {}, exception: {}", executionCount, exception); return super.retryRequest(exception, executionCount, context); } }); HttpComponentsClientHttpRequestFactory httpRequestFactory = new HttpComponentsClientHttpRequestFactory(clientBuilder.build()); httpRequestFactory.setConnectTimeout(connectionTimeoutMs); httpRequestFactory.setConnectionRequestTimeout(readTimeoutMs); httpRequestFactory.setReadTimeout(readTimeoutMs); RestTemplate restTemplate = new RestTemplate(httpRequestFactory); Listinterceptors = new ArrayList (); interceptors.add(new LoggingRequestInterceptor()); restTemplate.setInterceptors(interceptors); restTemplate.setRequestFactory(new BufferingClientHttpRequestFactory(httpRequestFactory)); MappingJackson2HttpMessageConverter messageCOnverter= restTemplate.getMessageConverters().stream().filter(MappingJackson2HttpMessageConverter.class::isInstance) .map(MappingJackson2HttpMessageConverter.class::cast).findFirst().orElseThrow(() -> new RuntimeException("MappingJackson2HttpMessageConverter not found")); messageConverter.setObjectMapper(objectMapper); restTemplate.getMessageConverters().stream().filter(StringHttpMessageConverter.class::isInstance).map(StringHttpMessageConverter.class::cast).forEach(a -> { a.setWriteAcceptCharset(false); a.setDefaultCharset(StandardCharsets.UTF_8); }); return restTemplate; } } public class LoggingRequestInterceptor implements ClientHttpRequestInterceptor { private static final Logger LOGGER = LoggerFactory.getLogger(LoggingRequestInterceptor.class); @Override public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException { traceRequest(request, body); ClientHttpResponse respOnse= execution.execute(request, body); traceResponse(response); return response; } private void traceRequest(HttpRequest request, byte[] body) throws IOException { LOGGER.debug("===========================request begin================================================"); LOGGER.debug("URI : {}", request.getURI()); LOGGER.debug("Method : {}", request.getMethod()); LOGGER.debug("Headers : {}", request.getHeaders()); LOGGER.debug("Request body: {}", new String(body, "UTF-8")); LOGGER.debug("==========================request end================================================"); } private void traceResponse(ClientHttpResponse response) throws IOException { StringBuilder inputStringBuilder = new StringBuilder(); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(response.getBody(), "UTF-8")); String line = bufferedReader.readLine(); while (line != null) { inputStringBuilder.append(line); inputStringBuilder.append('\n'); line = bufferedReader.readLine(); } LOGGER.debug("============================response begin=========================================="); LOGGER.debug("Status code : {}", response.getStatusCode()); LOGGER.debug("Status text : {}", response.getStatusText()); LOGGER.debug("Headers : {}", response.getHeaders()); LOGGER.debug("Response body: {}", inputStringBuilder.toString()); LOGGER.debug("=======================response end================================================="); } }