이번 포스팅에서는 로그를 읽어오는 과정에서 발생할 수 있는 여러 문제 중 하나인 트랜잭션 단위 문제에 대해서 다뤘었습니다.
이전 포스팅은 아래 글을 참고해 주시기 바랍니다.
https://hyunily.tistory.com/46
이번 포스팅에서는 Offset 관리에 대해서 다뤄보려고 합니다.
Offset이란?
Offset은 다양한 분야에서 사용되는 개념입니다. 특히, DB 쿼리, 배치 처리, 웹, 분산 시스템 등 여러 곳에서 사용되며, 상대적인 차이를 나타내는 용어로, 특정 데이터나 요소를 찾는 기준점을 기준으로 얼마나 떨어져 있는지를 표현합니다.
가령, 배열이나 리스트와 같은 자료 구조에서 특정 데이터의 상대적인 위치를 나타낼 때 Offset을 사용할 수 있습니다.
이 외에도 배열의 index, DB에서 pagination 등이 있습니다.
Offset 활용 방법
이때까지 구축한 시스템의 대략적인 흐름은 아래와 같습니다.
1. 현재 활성화되어있는 로그파일을 확인한다.
2. 로그마이너를 통해 해당 로그 파일을 읽는다.
3. V$LOGMNR_CONTENTS 테이블로부터 활성화 상태가 아닌 트랜잭션의 데이터를 읽는다.
그럼 위 과정의 문제점은 어떤 게 있을까요?
보통 로그 파일은 순환하면서 기록됩니다. 가령, REDO01.LOG, REDO02.LOG, REDO03.LOG 파일 있다고 가정해 보겠습니다.
현재 활성화되어 있는 로그파일이 만약 1번이라면 1->2->3->1 순으로 순환할 것이고,
활성화되어 있는 로그파일이 만약 2번이라면 2->3->1->2 순으로 순환할 것입니다.
결론적으로 로그파일은 순환을 하며 바뀐다는 게 중요한 포인트입니다.
현재 프로젝트의 문제점 중 하나는 Offset관리가 정확히 되지 않아, 순환도 되지 않을뿐더러, 활성화되어있는 로그파일의 전체 데이터를 계속 가져오고 있습니다.
그래서 이번에는 Offset을 활용해서 이 문제를 해결해보려고 합니다.
Offset Table 생성하기
물론 이렇게 테이블을 생성하지 않고, 원하는 데이터만 기록할 수 있게 테이블을 생성하셔도 됩니다.
일단 마지막 작업을 기록하기 위해 다음과 같은 테이블을 생성했습니다.
마지막 작업을 기록한다는 건, 더 이상 전체 로그파일을 가져올 필요가 없고, 시작 위치를 지정해 줌으로써, 조회 효율성을 높일 수 있습니다.
마지막으로 조회하고 작업한 row의 데이터뿐만 아니라, 해당 row가 기록되어 있는 REDO 로그파일의 버전도 같이 기록함으로써
추후 순환에도 활용할 수 있게 했습니다.
Offset저장 로직
Save Last Work Step 생성
@Bean
public Step saveLastWorkStep(JobRepository jobRepository, PlatformTransactionManager transactionManager){
return new StepBuilder("saveLastWorkStep", jobRepository)
.tasklet(saveLastWorkTasklet(), transactionManager)
.allowStartIfComplete(true)
.build();
}
마지막 로그 데이터를 저장하기 위한 새로운 Step을 생성합니다.
Save Last Work Tasklet 구성
@Bean
public Tasklet saveLastWorkTasklet() {
return (contribution, chunkContext) -> {
ExecutionContext jobExecutionContext = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
List<Map<String, Object>> logContentsResults = (List<Map<String, Object>>) jobExecutionContext.get("logContentsResults");
int numberOfCurrentVersion = (int) jobExecutionContext.get("numberOfCurrentVersion");
if (logContentsResults != null && !logContentsResults.isEmpty()) {
Map<String, Object> lastLogEntry = logContentsResults.get(logContentsResults.size() - 1);
saveLogToDb(lastLogEntry, numberOfCurrentVersion);
} else {
log.warn("No log contents available to save.");
}
return RepeatStatus.FINISHED;
};
}
private void saveLogToDb(Map<String, Object> lastLogEntry, int numberOfCurrentVersion) {
String insertSql = "INSERT INTO SAVE_WORK (RS_ID, OPERATION, TABLE_NAME, REDO_VER) VALUES (?, ?, ?, ?)";
jdbcTemplate.update(insertSql,
lastLogEntry.get("RS_ID"),
lastLogEntry.get("OPERATION"),
lastLogEntry.get("TABLE_NAME"),
numberOfCurrentVersion);
log.info("Successfully saved last work log into SAVE_WORK table.");
}
기존에 로그 데이터를 조회하면서 jobExecutionContext에 로그 데이터 목록과 현재 몇 번 로그파일이 활성화되어 있는지에 대한 정보를 담아서 saveLastWorkTasklet에서 조회할 수 있게 합니다.
담아 온 로그 데이터 목록 중 마지막 데이터 값을 Map형식으로 저장하고, 해당 값과 현재 활성화된 로그파일의 정보를 saveLogToDb메서드에 전달합니다.
전달받은 매개변수를 기반으로, 위에서 만들었던 Offset 정보를 저장하는 테이블에 마지막 row에 대한 정보와 해당 row가 저장되어 있는 REDO 로그파일의 정보를 저장합니다.
결과 확인
위 과정과 같이 마지막으로 처리한 row에 대한 정보를 배치를 돌릴 때마다 저장하게 된다면 아래와 같이 해당 row에 대한 정보가 쌓이게 될 것입니다.
Offset조회 로직
그럼 Offset을 저장하는 과정이 생겼으니, 로그를 조회할때 해당 정보를 활용하는 방법에 대해서도 다뤄보겠습니다.
@Bean
public Tasklet readLastWorkTasklet() {
return ((contribution, chunkContext) -> {
String readSaveWorkSql = String.format(
"""
SELECT IDX, RS_ID, OPERATION, TABLE_NAME, REDO_VER
FROM save_work
WHERE idx = (SELECT MAX(idx) FROM save_work)
"""
);
try {
Map<String, Object> lastWork = jdbcTemplate.queryForMap(readSaveWorkSql);
if (lastWork != null) {
chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext()
.put("lastWork", lastWork);
} else {
log.warn("No entries found in SAVE_WORK table.");
}
} catch (Exception e) {
log.error("Error retrieving last work entry: {}", e.getMessage(), e);
throw e;
}
return RepeatStatus.FINISHED;
});
}
마지막 작업을 저장했던 테이블로부터 가장 최근에 저장된 데이터를 조회해서 chunkContext에 해당 정보를 저장합니다.
로그 데이터 조회 로직 수정
지난 포스팅에서 다뤘었던 트랜잭션 관련 예외처리를 적용한 로직은 아래와 같을 겁니다.
@Bean
public Tasklet readOracleLogStepTasklet() {
return ((contribution, chunkContext) -> {
...
String selectLogContentsSql = "SELECT ROW_ID, RS_ID, OPERATION, SEG_OWNER, TABLE_NAME, XIDUSN, XIDSLT, SQL_REDO " +
"FROM V$LOGMNR_CONTENTS " +
"WHERE TABLE_NAME IN ('USERS', 'COMMENTS', 'EMOJI', 'INTERACTION', 'POST', 'ROLE') " +
"AND (XIDUSN, XIDSLT) NOT IN (SELECT XIDUSN, XIDSLOT FROM V$TRANSACTION WHERE STATUS = 'ACTIVE')";
List<Map<String, Object>> logContentsResults = jdbcTemplate.queryForList(selectLogContentsSql);
...
});
}
해당 로직에서 이제 Offset에 저장된 정보를 기반으로 새로운 데이터만 조회하게 변경되어야 하므로 아래와 같이 쿼리문이 수정되게 됩니다.
@Bean
public Tasklet readOracleLogStepTasklet() {
return ((contribution, chunkContext) -> {
...
ExecutionContext jobExecutionContext = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
Map<String, Object> lastWorkResults = (Map<String, Object>) jobExecutionContext.get("lastWork");
String lastRsId = ((String) lastWorkResults.get("RS_ID")).trim();
String selectLogContentsSql = "SELECT ROW_ID, RS_ID, OPERATION, SEG_OWNER, TABLE_NAME, XIDUSN, XIDSLT, SQL_REDO " +
"FROM V$LOGMNR_CONTENTS " +
"WHERE TABLE_NAME IN ('USERS', 'COMMENTS', 'EMOJI', 'INTERACTION', 'POST', 'ROLE') " +
"AND TRIM(RS_ID) > '" + lastRsId + "' " +
"AND (XIDUSN, XIDSLT) NOT IN (SELECT XIDUSN, XIDSLOT FROM V$TRANSACTION WHERE STATUS = 'ACTIVE')";
List<Map<String, Object>> logContentsResults = jdbcTemplate.queryForList(selectLogContentsSql);
...
});
}
위 로직에서 쿼리문은 Offset 조회 로직에서 가져온 정보 중, RS_ID값을 기반으로 해당 RS_ID값보다 큰 로그 데이터들을 조회하는 내용입니다.
이 과정까지 완료되었다면, 조회할 때마다 항상 전체 데이터를 가지고 오던 부분에 대한 문제 해결은 완료되었습니다.
이제 로그파일 순환에 대한 부분을 다뤄 보겠습니다.
로그파일 순환 설정
이제 조회하고 활용할 수 있는 정보에는 마지막으로 작업했던 로그의 정보, 현재 활성화된 로그파일 정보가 있습니다.
만약 현재 활성화된 로그 파일의 번호와 마지막으로 작업했던 로그의 정보에 저장된 로그 파일의 번호가 일치하다면?
-> 기존의 로직을 그대로 활용
만약 일치하지 않는다면?
-> 이전 배치 작업에서 사용했던 로그 파일에서부터 현재 활성화된 로그 파일까지 모두 살피며 로직을 수행
이 과정대로 코드를 구현한다면 아래와 같은 코드가 완성될 것입니다.
if(numberOfCurrentVersion == redoVersion){
logContentsResults = stepByStep(currentRedoLogFile, lastRsId);
} else{
while(true){
if(numberOfCurrentVersion == redoVersion){
break;
}
currentRedoLogFile = currentRedoLogFile.substring(0,currentRedoLogFile.lastIndexOf(".LOG") - 1) + redoVersion + ".LOG";
for (Map<String, Object> row : stepByStep(currentRedoLogFile, lastRsId)) {
logContentsResults.add(row);
}
redoVersion++;
if(redoVersion % 3 == 1){
redoVersion = 1;
}
}
}
순환 문제 또한 코드에 적용시킨다면 이제 로그 데이터를 조회하는 순서는 아래와 같이 변하게 됩니다.
1. 지난 배치에서 마지막으로 처리한 로그 데이터 정보 조회
2. 해당 로그 데이터 정보를 기반으로 순환 로직 접근
3. 로그 파일을 순환하며 변동된 데이터 추적
이번 포스팅에서는 Offset을 활용해서 로그 파일로부터 전체 데이터가 아닌 변동된 데이터만 추적하는 로직과 로그 파일 순환 원리에 대해서 알아봤습니다.
다음 포스팅에서는 조회했던 로그 데이터들을 어떻게 정제해야 서로 다른 DB가 이 데이터들을 활용해서 데이터 동기화 작업을 할 수 있는지에 대해서 다뤄보겠습니다.
'Daily' 카테고리의 다른 글
[CDC 프로젝트] Skip한 Active Transaction Data 후처리 (0) | 2025.01.29 |
---|---|
[CDC 프로젝트] Uncommitted DML 로그 기록 최적화 방안 (0) | 2025.01.20 |
[CDC 프로젝트] Oracle DB to MySQL(1) (1) | 2025.01.04 |
Scanner vs BufferedReader (1) | 2024.12.19 |
CDC(Change Data Capture)란? (0) | 2024.12.09 |