10. RestApi Batch
- -
이번 포스트에서는 DB To DB가 아니라 RestApi로 제공되는 데이터를 백업하는 과정을 살펴보자.
RestApi Backup
결재 정보 처리를 위한 Model 처리
먼저 결재 정보를 페이지 단위로 서비스하는 Rest API를 생각해보자.
먼저 sakila 쪽에는 페이지 단위로 결재 정보를 반환하는 쿼리 처리가 필요하다.
<!-- sakila.xml-->
<select id="selectPaymentsByPage" resultMap="paymentBasic">
SELECT payment_id, customer_id, staff_id, rental_id, amount,
payment_date, last_update
FROM payment
ORDER BY payment_id ASC
limit 10 offset #{offset} <!-- [핵심] 페이지 기반 조회 -->
</select>
// SakilaMapper.java
public interface SakilaMapper {
// 월별 결재 정보
public List<Payment> selectMonthlyPayments(String paymentDate);
// 전체 결재 정보
public List<Payment> selectPaymentsByPage(Integer offset);
}
추가로 테스트 편의성을 위해서 backup table을 초기화 하도록 해보자.
<!-- backup-mapper.xml -->
<delete id="clearPayment">
DELETE FROM payment_backup
</delete>
public interface BackupMapper {
public int clearPayment();
public int insertPaymentBackup(Payment payment);
}
여기서는 간단한 예제 구성이어서 Service는 생략한다.
@RestController 구성
다음으로 두 요청을 처리하는 RestController를 구성해보자.
@RestController
@RequestMapping("/api")
@RequiredArgsConstructor
public class RestApiController {
private final SakilaMapper sakilaMapper;
private final BackupMapper BackupMapper;
@GetMapping("/payment")
public ResponseEntity<List<Payment>> getPayments(
@RequestParam(required=false, defaultValue = "1") Integer page) {
return ResponseEntity.ok(sakilaMapper.selectPaymentsByPage((page-1)*10));
}
}
CustomItemReader 작성
ItemStreamReader
이번에 처리할 백업이 기존과 다른점은 데이터 소스가 DB 기반이 아니라 Rest API라는 점이다. 당연히 기존에 사용하던 MyBatisCursorItemReader 같은 객체는 사용할 수가 없고 사용자 정의로 만들어야 한다. 이때 일반적으로 확장하는 인터페이스가 ItemStreamReader이다. ItemStreamReader는 ItemStream과 ItemReader를 상속 받는다.
public interface ItemStreamReader<T> extends ItemStream, ItemReader<T> {}
ItemReader는 앞서 살펴봤으니 ItemStream에 대해 살펴보자.
ItemStream은 Spring Batch에서 상태를 저장하고 복원하는 역할을 하는 인터페이스로 ItemReader, ItemWriter와 함께 사용되며 배치 작업이 중간에 실패했을 때 실패한 지점부터 재시작 할 수 있도록 도와준다.
public interface ItemStream {
/**
* 스트림을 연다. 리소스(파일, DB 등)를 초기화하고
* 재시작 시 이전 실행 상태(ExecutionContext)를 복원하는 로직을 작성한다.
* @param executionContext 현재 Step의 상태 정보가 담긴 객체
*/
default void open(ExecutionContext executionContext) throws ItemStreamException {}
/**
* 현재의 진행 상태를 ExecutionContext에 저장한다.
* 보통 청크(Chunk) 단위로 트랜잭션이 커밋될 때 호출된다.(롤백할 때도 호출되는듯)
* @param executionContext 업데이트할 상태 정보를 담을 객체
*/
default void update(ExecutionContext executionContext) throws ItemStreamException {}
/**
* 사용했던 리소스를 안전하게 닫는다.
* 작업 완료 후 또는 오류 발생 시 호출되어 메모리 누수를 방지.
*/
default void close() throws ItemStreamException {}
}
결국 우리가 할 일은 ItemStream의 open 메서드를 통해 특정 위치에서 데이터를 읽어오도록 하고 ItemReader의 read를 통해 데이터를 읽으면 된다.
PaymentApiItemReader 구현 - temReader 부분
먼저 RestClient를 이용하는 ItemReader 부분을 살펴보자. read()를 재정의한다는 점 말고는 그냥 RestClient를 사용하는 코드이다.
@Slf4j
@Component
public class PaymentApiItemReader implements ItemStreamReader<Payment> {
private static final String API_URL = "http://localhost:8080/api/payment";
private static final String LAST_PAGE_KEY = "api.last.page";
private RestClient restClient;
private List<Payment> currentPageData; // 현재 페이지의 데이터
private int currentPage; // 현재 페이지 번호
private int currentIndex; // 현재 페이지 내에서의 인덱스
// ItemReader 부분 재정의
@Override
public Payment read() throws Exception {
// 여기서 null 체크를 통해 첫 실행 시에도 자연스럽게 데이터를 가져옴
if (currentPageData == null || currentIndex >= currentPageData.size()) {
fetchNextPage();
if (currentPageData == null || currentPageData.isEmpty()) {
return null;
}
currentIndex = 0;
}
return currentPageData.get(currentIndex++);
}
private void fetchNextPage() {
try {
log.debug("🌐 Fetching API: {}?page={}", API_URL, currentPage);
currentPageData = restClient.get()
.uri(API_URL + "?page=" + currentPage)
.retrieve()
.body(new ParameterizedTypeReference<List<Payment>>() {});
int dataSize = (currentPageData != null) ? currentPageData.size() : 0;
log.debug("📦 Received {} items from page {}", dataSize, currentPage);
if (dataSize > 0) {
currentPage++;
}
} catch (Exception e) {
log.error("❌ API call failed: {}", e.getMessage(), e);
currentPageData = null;
throw new ItemStreamException("API 호출 실패", e);
}
}
// Item Stream 부분 재정의
}
PaymentApiItemReader 구현 - IItemStream 부분
다음은 ItemStream 부분이다. 상세한 설명은 주석으로 대체한다.
// Item Stream 부분 재정의
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
log.debug("🔓 PaymentApiItemReader opened");
restClient = RestClient.create();
// 상태 복구 로직 담당
if (executionContext.containsKey(LAST_PAGE_KEY)) {
int lastPage = executionContext.getInt(LAST_PAGE_KEY);
currentPage = lastPage + 1;
log.info("🔄 Restarting from page: {}", currentPage);
} else {
currentPage = 1;
log.info("✨ Starting from page: {}", currentPage);
}
currentIndex = 0;
currentPageData = null; // read()에서 fetchNextPage()가 호출되도록 null 초기화
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
int completedPage = currentPage - 1;
executionContext.putInt(LAST_PAGE_KEY, completedPage);
log.debug("💾 Saved last completed page: {}", completedPage);
}
@Override
public void close() throws ItemStreamException {
log.debug("🔒 PaymentApiItemReader closed");
currentPageData = null;
restClient = null;
}
Batch 처리
backup table 초기화 Tasklet
이번에는 전체 테이블을 초기화 하고 백업받아보기 위해서 먼제 table을 초기화 하는 Tasklet을 작성해보자. 기존의 ReportTasklet 처럼 Tasklet을 구현해서 execute를 재정의 해주면 된다.
@Slf4j
@Component
@RequiredArgsConstructor
public class CleaarPaymentTasklet implements Tasklet {
private final BackupMapper backupMapper;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
throws Exception {
backupMapper.clearPayment();
log.info("✅ payment_backup table cleared successfully");
return RepeatStatus.FINISHED;
}
}
Job 구성하기
이제 이것들을 모아서 하나의 Job을 구성한다.
@Configuration
@RequiredArgsConstructor
@Slf4j
public class PaymentApiBackupBatch {
private static final int CHUNK_SIZE = 10;
@Bean
public Step clearPaymentStep(
JobRepository jobRepository,
@Qualifier("quietjunTransactionManager") PlatformTransactionManager transactionManager,
CleaarPaymentTasklet clearPaymentTasklet) {
return new StepBuilder("clearPaymentStep", jobRepository)
.tasklet(clearPaymentTasklet, transactionManager)
.build();
}
@Bean
@StepScope
ItemProcessor<Payment, Payment> apiPaymentProcessor() {
return payment -> {
// 백업용 필드 추가
payment.setBackupDate(LocalDateTime.now());
payment.setPaymentYm(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM")));
log.debug("🔧 PROCESS: payment_id : {} ", payment.getPaymentId());
return payment;
};
}
@Bean
Step apiBackupStep(
JobRepository jobRepository,
@Qualifier("quietjunTransactionManager") PlatformTransactionManager transactionManager,
PaymentApiItemReader reader,
@Qualifier("apiPaymentProcessor") ItemProcessor<Payment, Payment> processor,
MyBatisBatchItemWriter<Payment> writer) {
return new StepBuilder("apiBackupStep", jobRepository)
.<Payment, Payment>chunk(CHUNK_SIZE)
.transactionManager(transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
Job apiBackupJob(JobRepository jobRepository,
Step apiBackupStep,
Step clearPaymentStep,
Step reportStep) {
return new JobBuilder("apiBackupJob", jobRepository)
.start(clearPaymentStep)
.next(apiBackupStep)
.next(reportStep)
.build();
}
}
호출하기
이제 Controller에서 호출 해주기만 하면 다시 평화로운 Spring Batch가 진행된다.
@GetMapping("/api-backup")
public ResponseEntity<String> runApiBackup() {
try {
JobParameters params = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobOperator.start(apiBackupJob, params);
return ResponseEntity.ok("API Backup Job 시작: " + execution.getId());
} catch (Exception e) {
return ResponseEntity.status(500)
.body("실패: " + e.getMessage());
}
}'Spring Batch' 카테고리의 다른 글
| 09. Backup Batch - 5 (0) | 2026.01.09 |
|---|---|
| 08. Backup Batch - 4 (0) | 2026.01.08 |
| 07. Backup Batch - 3 (0) | 2026.01.07 |
| 06. Backup Batch - 2 (0) | 2026.01.06 |
| 05. Backup Batch - 1 (0) | 2026.01.05 |
소중한 공감 감사합니다