이전 포스팅에서는 기존 로그 데이터를 가져올 때 활성화 상태인 트랜잭션에 대해서는 제외하고 로그 데이터를 가져왔었습니다.
하지만 이렇게 활성화 상태인 트랜잭션의 데이터들이 후에 완료되고 난 후면, 결국 해당 데이터도 분명히 반영시켜줘야 합니다.
이번 포스팅에서는 해당 문제에 대해서 이해해 보고, 해결하는 방법에 대해서 다뤄보겠습니다.
활성화 상태의 트랜잭션을 제외한 로그 데이터를 수집하는 내용은 아래 포스팅에 있으니 참고하시기 바랍니다.
https://hyunily.tistory.com/46
문제 이해하기
기존 로직대로 데이터를 조회한다면 아래 그림 과정처럼 진행될 겁니다.
그럼 위 그림에서 가질 수 있는 의문점은 "활성화된 트랜잭션이 만약 commit이 완료된 상태로 바뀐다면?"입니다.
결국 마무리된 트랜잭션 데이터 정보도 스프링 배치가 수집해야 될 데이터의 대상이 될 것이고, 필연적으로 동기화가 진행되어야 할 것입니다.
그럼 활성화 되어있는 트랜잭션의 정보는 어떻게 처리해야 할까에 대해서 본격적으로 다뤄보겠습니다.
활성화 트랜잭션 정보 수집하기
마무리된 트랜잭션에 대해서 정보를 수집하는 부분에 대해서는 이전에 다뤘었습니다. 그럼 그 역으로 접근한다면 결국 활성화 상태의 트랜잭션 정보도 수집할 수 있습니다.
SELECT RS_ID, OPERATION, SEG_OWNER, TABLE_NAME, SQL_REDO, XIDUSN, XIDSLT
FROM V$LOGMNR_CONTENTS
WHERE TABLE_NAME IN ('USER', 'COMMENT', 'EMOJI', 'INTERACTION', 'POST', 'ROLE')
AND (XIDUSN, XIDSLT) IN (SELECT XIDUSN, XIDSLOT FROM V$TRANSACTION WHERE STATUS = 'ACTIVE');
위 쿼리를 활용하면 commit 되지 않은 트랜잭션 정보들을 얻을 수 있습니다.
그럼 실제로 저장하고 기억하고 있어야 할 정보들은 해당 트랜잭션을 고유하게 구별할 수 있는 값과, 해당 트랜잭션 정보를 담고 있는 로그파일의 버전 정보일 것입니다.
그 이유는 결국 현재 스프링 배치에서 처리하지 못한 트랜잭션은 다음 배치가 데이터를 동기화할 때, 다시 해당 트랜잭션의 상태를 확인하고, 만약 commit상태가 되어있다면 해당 데이터를 우선적으로 동기화해야 하기 때문입니다.
활성화 트랜잭션 정보 저장 로직
그럼 기존 코드에 활성화 트랜잭션 정보를 저장하는 로직을 추가해 보겠습니다.
public List<Map<String, Object>> stepByStep(String currentRedoLogFile, String lastRsId) {
...
// 진행중인 트랜잭션이 있는지 조회
String selectActiveTransactionSql = "SELECT ROW_ID, RS_ID, OPERATION, SEG_OWNER, TABLE_NAME, XIDUSN, XIDSLT, SQL_REDO " +
"FROM V$LOGMNR_CONTENTS " +
"WHERE TABLE_NAME IN ('USERS', 'COMMENTS', 'EMOJI', 'INTERACTION', 'POST', 'ROLE') " +
"AND TRIM(RS_ID) > '" + lastRsId + "' " +
"AND (XIDUSN, XIDSLT) IN (SELECT XIDUSN, XIDSLOT FROM V$TRANSACTION WHERE STATUS = 'ACTIVE')";
List<Map<String, Object>> activeTransactionResults = jdbcTemplate.queryForList(selectActiveTransactionSql);
// 만약 진행중인 트랜잭션이 존재하면, 해당 트랜잭션 정보 저장
if(!activeTransactionResults.isEmpty()) {
for(Map<String, Object> row : activeTransactionResults) {
String numberOfRedoLogFile = currentRedoLogFile.substring(currentRedoLogFile.length() - 5, currentRedoLogFile.length() - 4);
String insertSql = "INSERT INTO ACTIVE_TRANS (IDX, XIDUSN, XIDSLT, REDO_VER) VALUES (NULL, ?, ?, ?)";
jdbcTemplate.update(insertSql,
row.get("XIDUSN"),
row.get("XIDSLT"),
numberOfRedoLogFile);
}
}
...
}
기존 로직에서 로그마이너를 실행한 바로 직후, 일단 트랜잭션 정보에 대해서 조회합니다.
만약 진행중인 트랜잭션이 존재한다면 XIDUSN, XIDSLT, REDO_VER의 정보를 저장해 주는 로직을 추가합니다.
Offset관리 테이블 구조 변경
ACTIVE_TRANS 테이블은 트랜잭션 정보만 담고있는 테이블입니다. 이 또한 매번 전체 트랜잭션의 정보를 가져올 필요 없이, 처리되지 않은 최신 트랜잭션 정보들만 가져오면 됩니다.
그래서 ACTIVE_TRANS 테이블의 IDX를 Offset관리에서 추가로 저장해서 관리해야 합니다.
그러므로 기존 SAVE_WORK 테이블에 새로운 컬럼인 TRANS_IDX도 추가해 주겠습니다.
ALTER TABLE SAVE_WORK
ADD TRANS_IDX NUMBER;
Offset 데이터 조회 로직 변경
컬럼이 추가됨에 따라, 조회하는 로직에도 변동이 생깁니다.
@Bean
public Tasklet readLastWorkTasklet() {
return ((contribution, chunkContext) -> {
String readSaveWorkSql = String.format(
"""
SELECT IDX, RS_ID, OPERATION, TABLE_NAME, REDO_VER, TRANS_IDX
FROM SAVE_WORK
WHERE IDX = (SELECT MAX(IDX) FROM SAVE_WORK)
"""
);
try {
Map<String, Object> lastWork = jdbcTemplate.queryForMap(readSaveWorkSql);
if (lastWork != null) {
chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext()
.put("lastWork", lastWork);
} else {
log.warn("No entries found in SAVE_WORK table.");
}
} catch (Exception e) {
log.error("Error retrieving last work entry: {}", e.getMessage(), e);
throw e;
}
return RepeatStatus.FINISHED;
});
}
기존 활성화 트랜잭션 데이터 후처리 메서드 생성
활성화 트랜잭션 데이터를 저장하는 로직도 추가했으니, 이제 배치가 시작될 때 이전에 저장되어 있던 트랜잭션 정보들을 활용해서 다시 해당 데이터들을 후처리 해주는 로직도 추가해줘야 합니다.
public Long activeTransStepByStep(Long transIdx) {
String readActiveTransSql;
if (transIdx == 0L) {
readActiveTransSql = "SELECT IDX, XIDUSN, XIDSLT, REDO_VER " +
"FROM ( " +
" SELECT IDX, XIDUSN, XIDSLT, REDO_VER, " +
" ROW_NUMBER() OVER (PARTITION BY XIDUSN, XIDSLT, REDO_VER ORDER BY IDX DESC) as rn " +
" FROM ACTIVE_TRANS " +
") subquery " +
"WHERE rn = 1 " +
"ORDER BY IDX";
} else {
readActiveTransSql = "SELECT IDX, XIDUSN, XIDSLT, REDO_VER " +
"FROM ( " +
" SELECT IDX, XIDUSN, XIDSLT, REDO_VER, " +
" ROW_NUMBER() OVER (PARTITION BY XIDUSN, XIDSLT, REDO_VER ORDER BY IDX DESC) as rn " +
" FROM ACTIVE_TRANS " +
" WHERE IDX > " + transIdx + " " +
") subquery " +
"WHERE rn = 1 " +
"ORDER BY IDX";
}
List<Map<String, Object>> activeTransactionResults = jdbcTemplate.queryForList(readActiveTransSql);
long lastIdx = 0L;
if (!activeTransactionResults.isEmpty()) {
for (Map<String, Object> result : activeTransactionResults) {
long idx = ((BigDecimal) result.get("IDX")).longValue();
if (idx > lastIdx) {
lastIdx = idx;
}
}
}
String prefix = 로그 파일 위치 경로;
String suffix = ".LOG";
List<Map<String, Object>> logContentsResults = new ArrayList<>();
for(Map<String, Object> row : activeTransactionResults) {
// Step 1: DBMS_LOGMNR.ADD_LOGFILE 실행
String addLogFileSql = String.format(
"BEGIN DBMS_LOGMNR.ADD_LOGFILE(" +
"LOGFILENAME => '%s', " +
"OPTIONS => DBMS_LOGMNR.NEW); END;", prefix + row.get("REDO_VER") + suffix
);
jdbcTemplate.execute(addLogFileSql);
// Step 2: DBMS_LOGMNR.START_LOGMNR 실행
String startLogMinerSql = "BEGIN DBMS_LOGMNR.START_LOGMNR(" +
"OPTIONS => DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG); END;";
jdbcTemplate.execute(startLogMinerSql);
String selectLogContentsSql = "SELECT ROW_ID, RS_ID, OPERATION, SEG_OWNER, TABLE_NAME, XIDUSN, XIDSLT, SQL_REDO " +
"FROM V$LOGMNR_CONTENTS " +
"WHERE TABLE_NAME IN ('USERS', 'COMMENTS', 'EMOJI', 'INTERACTION', 'POST', 'ROLE') " +
"AND XIDUSN = " + row.get("XIDUSN") + " AND XIDSLT = " + row.get("XIDSLT");
logContentsResults = jdbcTemplate.queryForList(selectLogContentsSql);
}
for (Map<String, Object> row : logContentsResults) {
kafkaTemplate.send(KAFKA_TOPIC, (String) row.get("ROW_ID"), row);
}
return lastIdx;
}
반복되는 과정이지만, 혹여나 해당 트랜잭션이 다른 로그 파일에 저장되어 있는 경우를 대비해 기존에 데이터를 카프카에 전송하는 과정을 반복합니다.
readActiveTransSql은 ACTIVE_TRANS 테이블에서 각 (XIDUSN, XIDSLT, REDO_VER) 조합에 대해 IDX가 가장 큰(최신) 행만을 선택하고, 이 결과들을 IDX 값 기준으로 오름차순으로 정렬하는 쿼리입니다.
마지막 반환값이 lastIdx 즉, 마지막 처리 IDX를 반환하는 이유는 위에서 변경한 SAVE_WORK 컬럼에 이제 추가로 저장되어야 하기 때문에 이를 반환시켰습니다.
기존 메인 로직 수정
후처리 메서드 생성도 완료했으니, 메인 로직에서 해당 메서드를 호출해서 우선적으로 트랜잭션 정보 데이터를 처리하고 나서 최신 로그 파일들에 대해 처리할 수 있게 로직을 변경해 줍니다.
추가되는 코드가 위치해야 될 부분은 메인 로직을 실행하는 부분 위쪽입니다.
@Bean
public Tasklet readOracleLogStepTasklet() {
return ((contribution, chunkContext) -> {
...
BigDecimal transIdx = (BigDecimal) lastWorkResults.get("TRANS_IDX");
Long activeTransLastIdx;
if (transIdx != null) {
activeTransLastIdx = activeTransStepByStep(transIdx.longValue());
} else {
System.out.println("TRANS_IDX is null!");
activeTransLastIdx = activeTransStepByStep(0L);
}
chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext()
.put("activeTransLastIdx", activeTransLastIdx);
...
});
}
Offset 정보 저장 로직 변경
마지막으로 Offset에 어디까지 ACTIVE_TRANS 테이블 작업이 완료되었는지에 대해서 추가로 저장해줘야 합니다.
@Bean
public Tasklet saveLastWorkTasklet() {
return (contribution, chunkContext) -> {
ExecutionContext jobExecutionContext = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
List<Map<String, Object>> logContentsResults = (List<Map<String, Object>>) jobExecutionContext.get("logContentsResults");
Long activeTransLastIdx = (Long) jobExecutionContext.get("activeTransLastIdx");
int numberOfCurrentVersion = (int) jobExecutionContext.get("numberOfCurrentVersion");
if (logContentsResults != null && !logContentsResults.isEmpty()) {
Map<String, Object> lastLogEntry = logContentsResults.get(logContentsResults.size() - 1);
saveLogToDb(lastLogEntry, numberOfCurrentVersion, activeTransLastIdx);
} else {
log.warn("No log contents available to save.");
}
return RepeatStatus.FINISHED;
};
}
private void saveLogToDb(Map<String, Object> lastLogEntry, int numberOfCurrentVersion, Long activeTransLastIdx) {
String insertSql = "INSERT INTO SAVE_WORK (RS_ID, OPERATION, TABLE_NAME, REDO_VER, TRANS_IDX) VALUES (?, ?, ?, ?, ?)";
jdbcTemplate.update(insertSql,
lastLogEntry.get("RS_ID"),
lastLogEntry.get("OPERATION"),
lastLogEntry.get("TABLE_NAME"),
numberOfCurrentVersion,
activeTransLastIdx);
}
이처럼 로직을 수정하고 나면 이제 누락되는 데이터가 없이 동기화가 완료되는 로직이 완성될 것입니다.
물론 아직 부족한 부분이 많고, 코드적으로 반복되는 부분에 있어서 최적화를 진행해야 하는 부분은 확실히 존재합니다.
최적화와 성능 부분에 대해서는 다음 포스팅에서 자세히 다뤄보겠습니다.
부족한 부분에 대해서는 가차 없이 말씀 주시면 감사하겠습니다!!
'Daily' 카테고리의 다른 글
[CDC 프로젝트] Offset을 활용한 로그 읽기 (0) | 2025.01.21 |
---|---|
[CDC 프로젝트] Uncommitted DML 로그 기록 최적화 방안 (0) | 2025.01.20 |
[CDC 프로젝트] Oracle DB to MySQL(1) (1) | 2025.01.04 |
Scanner vs BufferedReader (1) | 2024.12.19 |
CDC(Change Data Capture)란? (0) | 2024.12.09 |