kafka와 Algo
[Algo] 백준 33851번
- 문제 링크
- 아이디어를 떠올렸음. 근데 bfs 함수 돌리는 걸 함수 안에서 또 처리하는 중복 상태 발생. 그래서 시간 복잡도 무조건 초과하는 아이디어였음.
- 전처리 하듯이, 중복되는 bfs를 밖으로 꺼낸다고 치면, 결국 모든 정점간의 최단 거리를 구하는 전처리를 진행하면 됨
- 그러고 나서 계산하려는 거 하면 되는..
- 주의해야 했던 점
- (1) 시간초과가 날 땐 인접행렬로 인한 이중 for문 조회가 문제가 되진 않는지 살펴본다.
- (2) 인접한 것과 갈 수 있는 것의 차이는 다르다. 이걸 그래프나 행렬로 표현할 때 초기값을 -1로 줘서 갈 수 있는지 없는지 반드시 구분해야 한다. 이거 빼먹으면 말짱 도루묵임
[Kafka] 학습 로그 받아올 때 트러블 슈팅 기록
카프카 사용
[기존 상황]
- redis를 단순 버퍼로 쓰려던 계획
[변경 상황]
- kafka를 메시지 브로커로 도입
- 비동기 데이터 파이프라인
전체 아키텍처 개요
AI서버 → Push → Kafka → Pull → Spring
상세 데이터 흐름
- topic = 고속도로 설계
- topic name:
study-log-events - 파티션 개수: 3개 정도? - 병렬 처리하려고
- replication? 로컬 개발용으로 1개, 운영 환경 상에서 3개
- topic name:
- AI 서버 = 프로듀셔 역할
- AI 모델이 이미지를 분석한 직후 결과를 카프카로 쏘아 보냄
- 페이로드
- sessionId
- eventType
- timestamp
- 메시지를 보낼 때 Key = sessionId로 지정해야 함
- 같은 Key를 가진 메시지를 항상 같은 파티션에 지정함
- 그래야 사용자 A의 로그 순서 (집중 → 딴짓 → 집중)이 뒤섞이지 않음
- 카프카 클러스터 (버퍼)
- 스프링 서버가 점검 중이거나 잠시 멈춰도 카프카는 안전하게 데이터가 쌓임
- 스프링 부트
@KafkaListener를 사용해서 데이터를 가져옴. 이때 Batch Listener를 사용함- 기존의 경우라면: 1개 DB에 저장함
- 카프카 적용: 카프카에서 로그 500개 줘 → 500개 한번에 받고 → 메모리에서 통계를 계산함 → DB에 1번 업데이트로 충분해짐
왜 카프카를 써야 하는가?
- 처리량
- 초당 12회 이상 발생하는 로그 데이터를 Redis가 메모리에서 전부 감당하기엔 비용이 비쌉니다. Kafka는 디스크 기반이면서도 순차 I/O를 사용하여 처리량이 월등히 높음
- 데이터 유실 방지
- Redis는 인메모리라 서버가 꺼지면 데이터가 날아갈 위험이 크지만, Kafka는 디스크에 영속 저장하므로 소비(Consume)하지 못한 데이터도 안전함
- BackPressure 조절
- AI 서버가 폭주해서 데이터를 쏟아내도, Spring 서버는 자신의 속도에 맞춰서(
poll()) 가져오면 되므로 서버가 다운되지 않음
- AI 서버가 폭주해서 데이터를 쏟아내도, Spring 서버는 자신의 속도에 맞춰서(
카프카 서버를 도커에 어떻게 띄웠을까?
-
docker-compose.yml zookeeper의 역할
- 카프카 브로커가 살아있는지 상태 정보를 관리함
- 카프카를 켜려면 무조건 켜져 있어야 하는 것
포트가 2개인 이유
- Spring, AI ⇒ 내부용:
kafka:29092포트를 사용- 같은 도커 네트워크 안에 있기 때문에 host port 밖으로 갔다 오지 않아도 됨
- 개발자 로컬 PC ⇒ 외부용:
localhost:9092포트를 사용- 로컬에서 kafka-console 테스트를 하거나, DB 툴로 접속할 때는 밖에서 안으로 들어와야 함
-
application-dev.yml
- consumer: AI 서버가 보낸 로그를 읽어올 때 필요한 규칙
group-id: virtudy-group필요한 이유- 부하 분산: 백엔드 서버를 3대 띄운다고 치면 카프카가 메시지를 3명에게 나눠서 줄 수 있다는 점이 있음
- 상태 저장: 어디까지 읽었는지 offsest을 ID 기준으로 카프카가 기억함. 그래서 서버가 재부팅되어도 이어서 읽을 수 있음
auto-offset-rest: earliest- 우리가 어디까지 읽었는지 까먹었거나(Offset 정보 없음), 처음 접속했다면 맨 처음부터 다시 읽어와의 역할을 한다.
earliest: 맨 옛날 데이터부터 다 가져옴. (데이터 유실 방지 - 추천)latest: 지금 이 순간부터 들어오는 것만 받음. (과거 데이터 버림)
key/value-deserializer(해독기)- 카프카에는
010101같은 바이트 덩어리로 저장되어 있으니, 이걸 다시 자바 객체(String, JSON)로 바꿔달라는 뜻임 - 설정:
key: 메시지 키는 보통 ID니까String으로 해독.value: 메시지 내용은 JSON이니까JsonDeserializer로 해독해서 DTO 객체로 변환.
- 카프카에는
spring.json.trusted.packages: "*"(⚠️ 트러블 슈팅 단골)- 어떤 패키지의 DTO가 오더라도 의심하지 말고 변환해줘 라는 뜻
- 왜 필요한가?
- 보안상의 이유로 Spring은 처음 보는 패키지의 객체로 변환하는 걸 막음.
- 이걸(전체 허용)로 안 해주면, 패키지 경로가 조금만 달라도 “이 객체는 위험해서 못 바꿉니다” 라며 에러가 터짐
- 실무 팁: 개발 편의성을 위해 를 쓰지만, 보안이 극도로 중요한 금융권에서는 특정 패키지(
com.ssafy.virtudy.*)만 적기도 함
- producer: 백엔드 서버 → 카프카로 메시지 보낼 때 필요한 규칙
key/value-serializer(암호화기)- 자바 객체(DTO)를 카프카가 이해할 수 있는 바이트 덩어리로 포장해줘 라는 뜻
- 설정:
key:StringSerializer(문자열 -> 바이트)value:JsonSerializer(자바 객체 -> JSON 바이트)
- consumer: AI 서버가 보낸 로그를 읽어올 때 필요한 규칙
List 형태로 받을 때 Batch를 써야 해
public void consume(List<StudyLogRequest> logs) { ... }
- 만약 List로 받지 않는다면 → 디폴트로 단 건 모드 Single로 동작함
- 그럼 왜 Batch로 받아야 할까?
- 상황 가정: AI가 로그를 1초에 10개씩 보낸다
- 배치를 안 쓰는 경우:
- 메시지 1개 → DB 연결 → INSERT → 트랜잭션 커밋 → 연결 해제 (이걸 1초에 10번 반복해야 함)
- 배치를 쓰는 경우:
- 메시지가 #개 쌓일 때까지 wait 또는 있는 만큼 바로 가져옴 → List 형태 → DB 연결 → saveAll() : Bulk Insert → 트랜잭션 커밋 1회
- 이점: JPA의 Dirty checking이나 Bulk Insert 효율을 극대화할 수 있음 → 대량 트래픽 여유롭게 처리 가능
카프카에 잘못된 데이터를 날렸을 때의 트러블 슈팅
- 카프카는 메시지를 한 번 읽고 버리는 게 아니라, 디스크에 파일로 저장해둔다.
- 그래서 컨테이너를 껐다 켜도, 아까 보냈던 “잘못된 형식의 데이터(Poison Pill)”가 여전히
offset 0번에 남아 있습니다. - 매핑을 돕기 위한 필수 책
- 해결책:
application-local.yml에 “헤더 없으면 무조건StudyLogRequest로 변환해!”라고 알려주는 설정을 추가해야 함 spring.json.value.default.type: "com.ssafy.virtudy.study.dto.StudyLogRequest"
- 해결책:
카프카는 그럼 버퍼인가 DB인가
1단계: 카프카의 저장 - 버퍼
- 행동: AI가 데이터를 보내면, 카프카는 자기네 하드디스크(파일 시스템)에 이 데이터를
.log파일 형태로 저장합니다. - 왜 저장하나요?
- 전기가 나가도 데이터가 안 날아가게 하려고요. (Persistence)
- 백엔드 서버가 바빠서 못 받아가도, 쌓아뒀다가 나중에 주려고요. (Buffering)
- DB인가요?
- 엄밀히 말하면 DB가 아닙니다. 쿼리(
SELECT * WHERE name=...)를 못 날립니다. 그냥 순서대로 쌓아두는 “거대한 큐(Queue)”일 뿐입니다.
- 엄밀히 말하면 DB가 아닙니다. 쿼리(
2단계: 자바(Spring)의 행동 → DB로 이동함
- 행동:
- 카프카(물류센터)에서 물건(
StudyLogRequest)을 꺼내옵니다. (@KafkaListener) - 포장(
JSON)을 뜯어서 내용물을 확인합니다. - “이거 창고(MySQL)에 넣어야겠다!” 하고
repository.save()를 호출합니다.
- 카프카(물류센터)에서 물건(
-
증거 (아까 보신 로그):
Hibernate: select ... from study_session ...👉 이건 카프카가 한 게 아닙니다. 님이 짠 자바 코드가 “진짜 DB(MySQL)”에 쿼리를 날린 겁니다.
Bulk Insert 변경 부분
- 학습 로그를 받아올 때 카프카는 500개씩 받아옴
- 그러면? 우리는 그걸 일일이 자바 코드로 for문 돌려가며 saveLog 할 이유 없이
- repo.saveAll() 이라는 Bulk Insert 를 사용하는 것이 훨씬 효율적임
- 왜냐하면 우리는 실시간성이 0.1초 이런 식이 아니라 1시간 단위니까
StudyLog의 ID가 GENERATETYPE.IDENTITY라서 생기는 문제
- saveAll이 정상 동작하지 않는다
- IDENTITY 전략
- DB에 일단 넣어야만 PK 값을 알 수 있는 구조라서 건건이 Insert를 날릴 수 밖에 없게 됨
- 해결책
- JdbcTemplate를 사용해서 뭉쳐 넣기가 가능함
- JPA는 그대로 두고, Bulk Insert 전용 레포를 하나 더 만들기
JPA saveAll() + IDENTITY와 JdbcTemplate의 차이점
- JPA
saveAll()+IDENTITY:- 동작:
INSERT->SELECT LAST_INSERT_ID()->INSERT->SELECT...(500번 반복) - 성능: 최악 (네트워크 왕복 500번)
- 동작:
- JDBC
batchUpdate():- 동작: 메모리에 500개 쿼리 쌓음 -> “옛다 받아라!” 하고 1번 전송 (
INSERT INTO ... VALUES (...), (...), (...)) - 성능: 최상 (네트워크 왕복 1번)
- 주의:
rewriteBatchedStatements=true옵션이 URL에 꼭 있어야 드라이버가 이걸 해줍니다. (이미 추가하셨으니 OK!)
- 동작: 메모리에 500개 쿼리 쌓음 -> “옛다 받아라!” 하고 1번 전송 (
JPA saveAll할때랑 IDENTITY 전략이 충돌나는 이유
- MySQL의 IDENTITY (Auto Increment) 전략
- DB에 일단 넣어야만 ID(PK)를 알 수 있는 구조
- 하이버네이트는 ID 값을 알아오기 위해 어쩔 수 없이 건건이 INSERT를 날림
- 근데 save All - batch 처리 시
- ID를 몰라도 일단 영속성 컨택스트에 저장하려 함. 근데? IDENTITY라서 id값을 모르니 영속성 컨텍스트에 저장할 수가 없음 결국 INSERT 해서 ID 값을 알아와야 하는 불상사 발생
- 관련 코드
INSERT INTO study_log (...) VALUES (...);이걸 n개 반복하는 거임
그럼 왜 JdbcTemplate으로는 가능하냐?
- 영속성 컨텍스트 같이 PK값이 반드시 있어야 하는 1차 캐시를 관리하지 않음
- 따라서 객체에 ID를 끼워 넣든 말든 관심 없음
- 그래서 IDENTITY 전략이어도 MySQL 드라이버의
rewriteBatchedStatements기능을 이용해 한 방에 전송이 가능 - 관련 코드: 네트워크 왕복 단 1회! (이게 성능 차이의 핵심)
INSERT INTO study_log (...) VALUES (...), (...), (...), ... (500개);
Bulk Insert 이슈 요약
기존: for문 돌려서 일일이 엔티티 save: insert쿼리를 날림
JPA saveAll 로 bulk insert 시도: 근데 IDENTITY 전략 때문에 결국 기존과 같은 로직으로 돌아감
- 이건 영속성 컨텍스트에 저장을 해야하는데 그떄 ID가 필요해서 저장을 못함. 그래서 결국엔 JPA가 하나씩 Insert를 하는 전략을 취하게 됨
JdbcTemplate로 변경 시: PK 몰라도 뭉터기로 넘기기 가능
- 구조:
JdbcTemplate + rewriteBatchedStatements=true: - 이게 가능한 이유가 Jdbc는 1차 캐시를 관리하지 않기 때문에 가능
- 여기서 의문점 : 1차 캐시에 배치가 있어야지 데이터를 한번에 올릴텐데, 1차 캐시 자체가 없는건지, 있긴하지만 ID가 필요하지 않은 구조인건지
- JdbcTemplate에는 1차 캐시(영속성 컨텍스트)가 없는 대신에 PreparedStatement 내부에 단순한 파라미터 저장소(List)가 있어서 거기에 모아둘 수 있음
- 이 저장소는 ID(PK)가 있든 없든 상관없이 그냥 데이터를 쌓아두기만 하는 상자라고 이해하면 됨
그럼 왜 IDENTITY 전략을 쓰냐?
- “일반적인 상황에서는 최선이지만, 대량 데이터 처리에는 최악”
- IDNETITY 전략이 편한 이유
- SEQUENCE 전략을 쓰면 “시퀀스 객체를 따로 생성해야 하나?”, “할당 사이즈(Allocation Size)를 얼마로 잡아야 성능과 번호 순서가 안 꼬일까?” 같은 고민을 해야 하는데 IDENTITY는 그냥 DB에 맡기면 끝임
- 특히 MySQL 환경에서는 가장 대중적이고 기본 설정임
- DB 입장에서는 본인이 직접 PK 값을 매기니까 데이터 무결성을 지키기가 쉬움
- IDENTITY 전략이 불리한 대량 데이터의 기준은?
- 배치 작업/로그 저장: 한 번에 1,000건 ~ 10,000건 이상 저장 → IDENTITY 쓰면 체감 속도가 수 초 이상으로 늘어남 (JdbcTemplate 권장)
- 일반적인 API: 한 번 호출에 데이터 1~10건 저장 → IDENTITY 써도 무방 (0.01초 차이)