스프링 배치의 실패시 재시도

2023. 8. 9. 16:42Technology[Back]

웹크롤링 시 외부API의 도움을 받거나 정적/동적 크롤링 라이브러리로 http 요청을 보내다보니 자주 웹소켓문제나 웹응답이 없는 등 예외가 발생하다보니 1번의 시도로는 배치의 안정적인 실행을 보장할 수 없는 상황이었습니다. 이를 해결하기 위해 재시도 로직을 구현해서 READER에서 3번의 재시도 & STEP에서 3번의 재시도 & STEP 실패시마다 Slack 알림을 이용해서 정기적 배치의 안정성을 높이고자 했습니다.

1. READER

public class WebCrawlingReader implements ItemReader<List<Goods>>, StepExecutionListener {
	
    private static final ThreadLocal<Integer> Count = new ThreadLocal<>();
    
    @Override
	public List<Goods> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
    	// 총 3번까지 재시도 가능
        Count.set(0);
        
        while(true) {
            Boolean isOk = true;

            // API 요청 URL 생성
            apiUrl = API_URL + "?key=" + API_KEY + "&apiCode=ProductInfo" + "&productCode=" + product.getProductCode();

            // API 요청을 위한 HttpURLConnection 객체 생성
            url = new URL(apiUrl);
            connection = (HttpURLConnection) url.openConnection();
            connection.setRequestMethod("GET");

            // API 응답 확인
            responseCode = connection.getResponseCode();
            if (responseCode == HttpURLConnection.HTTP_OK) {
                // API 응답 데이터 읽기
                in = new BufferedReader(new InputStreamReader(connection.getInputStream(), "EUC-KR"));
                response = new StringBuilder();
                while ((line = in.readLine()) != null) {
                    response.append(line);
                }
                in.close();

                responseXml = response.toString();

                // 제대로 된 응답을 받지 못하거나 오류가 있는 경우 해당 상품 넘김
                if(responseXml == null || responseXml.equals("")) break;
                if(responseXml.contains("ErrorResponse")) {
                    log.get().info("ErrorResponse is occured");
                    break;
                }

                jaxbContext = JAXBContext.newInstance(ProductInfoResponse.class);
                unmarshaller = jaxbContext.createUnmarshaller();

                // 응답을 ProductInfoResponse 객체로 만들어서 사용
                ProductInfoResponse responseString2 = (ProductInfoResponse) unmarshaller.unmarshal(new StringReader(responseXml));
                Product product2 = responseString2.getProduct();

                // 앞선 정보들 중 하나라도 null이거나 데이터가 존재하지 않는 경우 isOk = false
                if(isOk) {
                    goodsList.add(goods);
                    // INSERT 상품개수 누적
                    insert++;
                    break;
                }

                Count.set(Count.get() + 1);
                if(Count.get() > 3) {
                    throw new Exception("Product API callCount over 3");
                }
            }
        }
    }
    
    @Override
	// read 메서드 시작하기 전 호출
	public void beforeStep(StepExecution stepExecution) {
    	// read 전 처리해야할 로직 작성
    }
    
    @Override
	// read 메서드 종료 후 호출
	public ExitStatus afterStep(StepExecution stepExecution) {
    	// read 후 처리해야할 로직 작성
    }
}

위 코드에서 Count를 이용해서 우선 3번까지 재시도 할 수 있도록 로직을 짜고 3번 모두 실패하게 되면..

 

2.JOB

public Job myJob() {
    Step step = myStep();
    return this.jobBuilderFactory.get("myJob")
            // BeforeJob, AfterJob 호출
            .listener(jobCompletionNotificationListener)
            /* step start */
            .start(step)
            .on("FAILED").to(timeoutDecider) // 실패 시 timeoutDecider 호출
            .on("COMPLETED").end() // 성공 시 step end
            .from(timeoutDecider)
                .on("RESTART").to(step) // timeoutDecider에서 RESTART 발생 시 step 재실행
                .on("COMPLETED").end() // timeoutDecider에서 COMPLETE 발생 시 step end
                .on("*").fail() // timeoutDecider에서 다른 이벤트 발생 시 fail 처리(Job Failed)
            .end()
            /* step end */
            .incrementer(new RunIdIncrementer()) // job이 중복되지 않도록 id 부여
            .build();
}

3. TimeoutDecider

@Component
public class TimeoutDecider implements JobExecutionDecider {

    private final Map<Long, Integer> retryCounts = new HashMap<>();
    @Autowired
    private SlackService slackService;

    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        long jobExecutionId = jobExecution.getId();
        
        // 새롭게 시작한 job의 경우 재시도횟수 3회 부여
        if (!retryCounts.containsKey(jobExecutionId)) {
            retryCounts.put(jobExecutionId, 3);
        }

        int currentRetryCount = retryCounts.get(jobExecutionId);
        
        // Slack 메시지 작성
        String msg = "[account] " + jobExecution.getExecutionContext().get("account") + "\n";
        msg += "[target] " + jobExecution.getExecutionContext().get("target") + "\n";
        msg += "[startPageNum] " + jobExecution.getExecutionContext().get("startPageNum") + "\n";
        retryCounts.put(jobExecutionId, currentRetryCount - 1);
        msg += "[remainCount] " + retryCounts.get(jobExecutionId) + "번 남았습니다.\n";

        // 재시도횟수 모두 소진한 경우
        if (currentRetryCount == 0) {
            return new FlowExecutionStatus("FAILED");
        } 
        // 실패 없이 성공한 경우
        else if (stepExecution.getFailureExceptions().isEmpty()) {
            log.info("No failure exceptions. Job completed.");
            return new FlowExecutionStatus("COMPLETED");
        } 
        // 실패한 경우
        else {
            log.info("Failure exceptions occurred. Retry this.");
            msg += "[errorLog] " + stepExecution.getFailureExceptions();
            slackService.call(0, msg);
            return new FlowExecutionStatus("RESTART");
        }
    }
}

위 코드를 보면 JOB 에서 STEP을 시작할 때 "FAILED" 상태로 넘어오면 TimeoutDecider로 보내서 jobExecutionId 마다 재시도횟수를 3회 부여하고 재시도 횟수가 존재하면서 실패했으면 Slack에 알림을 띄우고 "RESTART" 상태로 Return하게되는데 JOB에서 STEP을 보면

.from(timeoutDecider)

.on("RESTART").to(step)

TimeoutDecider에서 "RESTART" 상태로 넘어오면 STEP을 재시작하고 계속 FAILED 상태 시 TimeoutDecider에서 제공한 재시도횟수 3회가 모두 소진되면 "FAILED" 상태로 Return 하게되고

.from(timeoutDecider)

.on("*").fail()

"FAILED" 상태는 별도로 지정하지 않았으므로 ALL에 걸려서 최종적으로 FAIL처리되게 됩니다. 만약 어떤 단계에서라도 성공하면 COMPLETE 상태로 JOB을 종료하게됩니다.

 

이상입니다. 지금까지 읽어주셔서 감사합니다.