스프링 배치의 chunk

2023. 8. 9. 16:14Technology[Back]

제가 구현해야할 사항은 웹크롤링을 구현할 때 1개의 검색어로 페이징 단위가 200개씩 1페이지부터 25페이지까지 크롤링하고 job을 끝내고 다음 job 실행시 다른 검색어로 동일한 절차를 진행하는 방식입니다.

이 때 저는 처음에 tasklet 방식으로 구현했고 tasklet 내부에서 for문을 돌면서 1페이지부터 25페이지까지 크롤링하고 DB 데이터 적재는 각 페이지단위로 200개씩 insert를 진행하고 job을 종료하는 방식으로 구현했었는데

위 방식은 중간에 interrupt가 나거나 오류 발생으로 job이 중간에 멈췄을 때 트랜잭션이 tasklet 단위로 묶이다보니 tasklet이 온전히 종료되지 않으면 그동안의 insert문은 실행되지 않은 채 종료되는 문제점이 있었습니다.

위 방식을 해결하기 위해 강제로 트랜잭션을 구성하거나 insert문을 execute하려했으나 이 방식은 스프링배치 사용시 허용되지 않는 방식이었고 for문을 tasklet 외부에서 구현하고 tasklet을 반복실행하려했으나 tasklet방식보다 트랜잭션 처리에 더 유연한 chunk방식을 알게 되어 chunk 방식으로 구현한 예제를 설명드리려 합니다.

1. JOB의 STEP 설정

public Step myStep() {
        return stepBuilderFactory.get("myStep")
        		// chunk 1 단위가 끝날때마다 DB에 SQL push
                .<List<Goods>, List<Goods>>chunk(1)
                .reader(webCrawlingReader)
                .processor(dataProcessor)
                .writer(myBatisItemWriter)
                .build();
    }

2. READER

public class WebCrawlingReader implements ItemReader<List<Goods>>, StepExecutionListener {
	@Override
	public List<Goods> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
    	// 더 이상 Read할 데이터가 없으면 null을 반환하면 step 종료
        if(goodsList.size() == 0) return null;
        return goodsList;
    }
    
    @Override
	// read 메서드 시작하기 전 호출
	public void beforeStep(StepExecution stepExecution) {
    	// read 전 처리해야할 로직 작성
    }
    
    @Override
	// read 메서드 종료 후 호출
	public ExitStatus afterStep(StepExecution stepExecution) {
    	// read 후 처리해야할 로직 작성
    }
}

3. PROCESSOR

public class DataProcessor implements ItemProcessor<List<Goods>, List<Goods>>{

	@Override
	public List<Goods> process(List<Goods> items) throws Exception {
		// 데이터 전처리 필요 시 작성
		return items;
	}

}

4. WRITER

public class MyBatisItemWriter implements ItemWriter<List<Goods>>{

	@Autowired
	private GoodsService goodsService;

	@Override
	public void write(List<? extends List<Goods>> items) throws Exception {
		// 각 item 순회하며 insert(각 chunk가 끝날 때마다 DB에 적재)
		for(List<Goods> item : items) {
			goodsService.insertGoodsList(item);
		}
	}
}

위 코드를 보시면 STEP 설정 시 chunk(1) 을 부여해 각 chunk 1단위마다 트랜잭션을 부여해서 1단위 chunk 작업이 끝날때마다 DB에 Insert 로직을 실행시키도록 구현했고 각 READER, PROCESSOR, WRITER 로직을 보시면 데이터 가져오기 전, 데이터 가져올 때, 데이터 가져온 후, 데이터 INSERT 이전 전처리, 데이터 INSERT로직 각 단계에서 상당히 유연하게 코드작성이 가능한 점을 알 수 있습니다. 특히나 트랜잭션 관련하여 원하는 단위만큼의 chunk를 개발자가 설정할 수 있다는 측면에서 유용했습니다.

이상입니다. 지금까지 읽어주셔서 감사합니다.