ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spring Batch로 대용량 데이터 처리 비동기화하기
    Spring 2024. 8. 5. 17:46

    문제 상황

    Scoop이라는 프로젝트에 매일 오전 6시와 12시에 네이버 뉴스 기사를 크롤링하는 로직이 있다. 이때, 크롤링한 뉴스를 요약까지 해야했고, 요약에는 GPT API를 사용했다. 또한, 이러한 GPT API는 FastAPI에 구현하여, 다른 서버에 존재한다.

     

    간단한 흐름은 다음과 같다.

     

    1. 네이버 뉴스의 각 카테고리의 뉴스들의 URL을 크롤링한다.

    2. 크롤링한 뉴스 URL을 사용하여 뉴스 본문을 크롤링한다.

    3. 크롤링한 뉴스 본문 내용 메시지를 publish 한다.

    4. FastAPI 서버에서 해당 메시지를 consume하여 뉴스 내용을 요약한다.

    5. 요약된 뉴스 내용 메시지를 publish 한다.

    6. Spring 서버에서 메시지를 consume하여 요약된 뉴스 내용을 DB에 저장한다.

     

    하지만 위 로직이 전부 처리되는 데 약 2분 30초 넘게 걸렸다. 그래서 비동기 처리의 필요성을 느껴 RabbitMQ를 적용하여 두 서버간 데이터를 주고받도록 했다. 하지만 크롤링 및 DB저장에도 18초가 소요되었다.

    Spring Batch 적용 전 크롤링 및 DB 저장에 소요된 시간

     

     

    18초 동안 사용자의 요청을 처리하지 못한다는 것은 큰 문제라고 판단했다. 그래서 크롤링과 DB저장을 비동기처리하는 방법을 찾아봤고 결국 Spring Batch를 적용하기로 했다.

     

    Spring Batch를 선택한 가장 큰 이유는 내 상황에 가장 적절했기 때문이다.

    - 일정 주기(매일 오전6시와 12시)로 실행

    - 실시간 처리가 어려운 대량의 데이터를 처리

    → 이런 작업을 하나의 애플리케이션에서 수행하면 성능 저하를 유발할 수 있으니 배치 애플리케이션을 구현

     

    더불어, 트랜잭션으로 데이터 처리 중 실패한 작업은 롤백하여 데이터 일관성을 유지해준다.

    또한, 배치 작업이 실패하면 재시도해 준다.

     

     

    Spring Batch 적용하기

    구현

     

    이제 Spring Batch를 적용해보겠다.

     

    1. 배치 관련 설정

    1-1. build.gradle에 라이브러리 추가

    implementation 'org.springframework.boot:spring-boot-starter-batch'
    
    testImplementation 'org.springframework.batch:spring-batch-test'

     

     

    1-2. yml

    batch:
      job:
        enabled: false
    • enabled를 true로 설정하면 애플리케이션을 실행할때마다 배치도 실행된다.

     

     

    * 참고로 메인 클래스에 @EnableBatchProcessing 를 추가해야 한다는 설명이 많은데, SpringBoot 3 부터는 원래의 의도대로 사용되지 않는다.

    -> 스프링 부트의 자동설정을 밀어내고, 애플리케이션의 설정을 커스텀하는 용도로 사용된다.

     

    2. Config 구현

    간단한 플로우는 다음과 같다.

    scrapJob
    - crawlingNewsUrlStep
        - newsUrlReader
        - newsUrlWriter
    - crawlingNewsContentStep
        - uncrawledNewsContentReader
        - summarizeContentProcessor
        - newsContentWriter

     

    @Configuration
    @RequiredArgsConstructor
    public class ScrapJobConfig {
    
        @Value("${batch.chunk-size}")
        private int chunkSize;
    
        private final DataSource dataSource;
        private final NewsRepository newsRepository;
        private final ContentProducer contentProducer;
    
        @Bean
        public Job scrapJob(JobRepository jobRepository,
                            Step crawlingNewsUrlStep, Step crawlingNewsContentStep) {
            return new JobBuilder("scrapJob", jobRepository)
                    .incrementer(new RunIdIncrementer())
                    .start(crawlingNewsUrlStep)
                    .next(crawlingNewsContentStep)
                    .build();
        }
    
        @Bean
        @JobScope
        public Step crawlingNewsUrlStep(JobRepository jobRepository,
                                        PlatformTransactionManager transactionManager) {
            return new StepBuilder("crawlingNewsUrlStep", jobRepository)
                    .allowStartIfComplete(true)
                    .<InitNewsDto, InitNewsDto>chunk(30, transactionManager)
                    .reader(newsUrlReader())
                    .writer(newsUrlWriter())
                    .build();
        }
    
        @Bean
        @StepScope
        public ItemReader<InitNewsDto> newsUrlReader() {
            return new NewsUrlReader();
        }
    
        @Bean
        @StepScope
        public JdbcBatchItemWriter<InitNewsDto> newsUrlWriter() {
            return new JdbcBatchItemWriterBuilder<InitNewsDto>()
                    .dataSource(dataSource)
                    .sql("insert into news(url, category) values (:url, :newsCategory)")
                    .beanMapped()
                    .build();
        }
    
        @Bean
        @JobScope
        public Step crawlingNewsContentStep(JobRepository jobRepository,
                                            PlatformTransactionManager transactionManager) {
            return new StepBuilder("crawlingNewsContentStep", jobRepository)
                    .allowStartIfComplete(true)
                    .<PreSummarizedNewsDto, PreSummarizedNewsDto>chunk(chunkSize, transactionManager)
                    .reader(uncrawledNewsContentReader())
                    .processor(summarizeContentProcessor())
                    .writer(newsContentWriter())
                    .build();
        }
    
        @Bean
        @StepScope
        public ItemReader<PreSummarizedNewsDto> uncrawledNewsContentReader() {
            return new UncrawledNewsContentReader(newsRepository);
        }
    
        @Bean
        @StepScope
        public ItemProcessor<PreSummarizedNewsDto, PreSummarizedNewsDto> summarizeContentProcessor() {
            return dto -> {
                contentProducer.sendMessage(new ContentMessageDto(dto.getId(), dto.getContent()));
                return dto;
            };
        }
    
        @Bean
        @StepScope
        public JdbcBatchItemWriter<PreSummarizedNewsDto> newsContentWriter() {
            return new JdbcBatchItemWriterBuilder<PreSummarizedNewsDto>()
                    .dataSource(dataSource)
                    .sql("update news set title = :title, content = :content, post_date = :postDate" +
                            " where id = :id")
                    .beanMapped()
                    .build();
        }
    }

     

    - JdbcBatchItemWriter는 JDBC의 Batch 기능을 사용하여 한번에 Database로 전달하여 Database 내부에서 쿼리들이 실행되도록 한다. (jojoldu 블로그 참고)

     

     

    3. Reader 구현

    - NewsUrlReader

    public class NewsUrlReader implements ItemReader<InitNewsDto> {
        @Value("${crawling.quantity}")
        private int crawlingQuantity;
    
        private final Iterator<NewsCategory> categories;
        private final Queue<InitNewsDto> initNews = new LinkedList<>();
    
        public NewsUrlReader() {
            categories = Arrays.stream(NewsCategory.values()).collect(Collectors.toList()).iterator();
        }
    
        @Override
        public InitNewsDto read() throws IOException {
            while (initNews.isEmpty() && categories.hasNext()) {
                NewsCategory category = categories.next();
                initNews.addAll(scrapCategoryNews(category));
            }
            return initNews.poll();
        }
    
        private List<InitNewsDto> scrapCategoryNews(NewsCategory category) throws IOException {
            Document doc = Jsoup.connect(category.getCategoryUrl()).get();
            Elements newsList = doc.select(".sa_list").select("li");
            if (newsList.size() < crawlingQuantity) {
                return scrapNewsUrl(newsList.size(), newsList, category);
            }
            return scrapNewsUrl(crawlingQuantity, newsList, category);
        }
    
        private List<InitNewsDto> scrapNewsUrl(int quantity, Elements newsList, NewsCategory category) {
            List<InitNewsDto> urls = new ArrayList<>();
            for (int i = 0; i < quantity; i++) {
                Element news = newsList.get(i);
                String url = Objects.requireNonNull(news.selectFirst(".sa_text_title")).attr("href");
                urls.add(new InitNewsDto(category.getName(), url));
            }
            return urls;
        }
    }

     

     

    - UncrawledNewsContentReader

    public class UncrawledNewsContentReader implements ItemReader<PreSummarizedNewsDto> {
        private final NewsRepository newsRepository;
        private Iterator<News> uncrawledNewsContents;
    
        public UncrawledNewsContentReader(NewsRepository newsRepository) {
            this.newsRepository = newsRepository;
            uncrawledNewsContents = newsRepository.findAllByContentIsNull().iterator();
        }
    
        @Override
        public PreSummarizedNewsDto read() throws IOException {
            if (!hasNextUncrawledNews()) {
                return null;
            }
            News news = uncrawledNewsContents.next();
            Document doc = Jsoup.connect(news.getUrl()).get();
            return getNewsContent(news, doc);
        }
    
        private boolean hasNextUncrawledNews() {
            if (!uncrawledNewsContents.hasNext()) {
                uncrawledNewsContents = newsRepository.findAllByContentIsNull().iterator();
            }
            return uncrawledNewsContents.hasNext();
        }
    
        private PreSummarizedNewsDto getNewsContent(News news, Document doc) {
            return PreSummarizedNewsDto.builder()
                    .id(news.getId())
                    .title(scrapTitle(doc))
                    .content(scrapContent(doc))
                    .postDate(scrapPostDate(doc))
                    .build();
        }
    
        private String scrapTitle(final Document doc) {
            Element titleElement = doc.selectFirst("#ct > div.media_end_head.go_trans > div.media_end_head_title > h2");
            if (titleElement == null) {
                titleElement = doc.selectFirst("#content > div.end_ct > div > h2");
            }
            if (titleElement != null) {
                return titleElement.text();
            }
            return null;
        }
    
        private String scrapContent(final Document doc) {
            Elements contentElements = doc.select("article#dic_area");
            if (contentElements.isEmpty()) {
                contentElements = doc.select("#articeBody");
            }
            return contentElements.outerHtml().replaceAll("\\<[^>]*>|\\n", "");
        }
    
        private String scrapPostDate(final Document doc) {
            Element dateElement = doc.selectFirst("div#ct> div.media_end_head.go_trans > div.media_end_head_info.nv_notrans > div.media_end_head_info_datestamp > div > span");
            if (dateElement != null) {
                return dateElement.attr("data-date-time");
            } else {
                Element altDateElement = doc.selectFirst("#content > div.end_ct > div > div.article_info > span > em");
                if (altDateElement != null) {
                    return altDateElement.text();
                }
            }
            return null;
        }
    }
    

     


    Reader의 로직이 길어서 위와 같이 분리하여 따로 클래스로 만들어서 ItemReader를 implements했다. 

     

    나머지 코드는 깃허브를 참고해주세용

     

     

    스프링 5가 되면서 Spring Batch에 변화가 많았다. 이 글을 보는 분들은 막힘없이 구현되길... 🙏🏽

     

     

    결과

    1. 비동기 적용으로 대용량 데이터 처리 중에도 사용자 요청 처리 가능

     

    2. 18초 -> 15초로 단축되는 효과

    Spring Batch 적용 전 크롤링 및 DB 저장에 소요된 시간
    Spring Batch 적용 후 크롤링 및 DB저장에 소요된 시간

     

    시간 단축은 크게 기대하지 않은 부분인데 그래도 의미있는 변화를 얻었다.

     

     

    여담

    배치는 보통 서버를 따로 둔다고 한다. 이후에 MSA를 적용하면 배치 부분은 서버를 따로 분리해 봐야겠다.

     

     

     

    [참고] (많아요 ㅎㅎ..)

    https://medium.com/@qkrgmlrud00/spring-batch%EB%A5%BC-%ED%99%9C%EC%9A%A9%ED%95%9C-%ED%81%AC%EB%A1%A4%EB%A7%81-%EC%9E%91%EC%97%85-%EC%9E%90%EB%8F%99%ED%99%94-d2a19ecf27a4

    https://www.youtube.com/watch?v=1xJU8HfBREY

    https://jojoldu.tistory.com/325

     

    https://github.com/AcornPublishing/definitive-spring-batch

     

    https://velog.io/@calaf/Spring-Batch-%EC%82%AC%EC%9A%A9%ED%95%B4%EB%B3%B4%EA%B8%B0-1.-%EC%8B%9C%EC%9E%91%ED%95%98%EA%B8%B0

     

     

    https://joyfulviper.tistory.com/43

     

    https://medium.com/@itsinil/spring-batch%EB%A5%BC-%ED%86%B5%ED%95%9C-%EB%8C%80%EC%9A%A9%EB%9F%89-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EC%95%88%EC%A0%84%ED%95%98%EA%B2%8C-%EC%B2%98%EB%A6%AC%ED%95%98%EA%B8%B0-d4940e71824b

     

    https://europani.github.io/spring/2023/06/26/052-spring-batch-version5.html

     

    https://jojoldu.tistory.com/507

Designed by Tistory.