엔지니어링

Mar 29, 2024

엔지니어링

실제로 동작하는 Raft 구현체 뜯어 보기 - 1

  • 이규봉

    소프트웨어 엔지니어

Mar 29, 2024

엔지니어링

실제로 동작하는 Raft 구현체 뜯어 보기 - 1

  • 이규봉

    소프트웨어 엔지니어

이 글에선 독자들이 Raft에 관한 이론적인 배경지식이 있다고 가정하고, tikv/raft-rs 코드를 샅샅이 훑어 보며 어떻게 실제로 분산 시스템에서의 상태 머신이 동기화되고 동작하게 되는지 몇 개의 간략한 시나리오에 걸쳐 알아보겠습니다.

이 글은 raft-rs 코드 분석에 초점을 맞추고 있지만, raft-rs 구현체가 유연성을 위해 네트워크 및 스토리지 계층을 포함하지 않기 때문에 온전한 이해를 위해 일부 섹션에서 raftify 소스 코드를 예제로 사용합니다.

💡 raftify는 Lablup에서 개발한 하이레벨의 Raft 구현체입니다. 이 글에선 raftify에 대해선 raft의 동작 방식을 이해하기 위한 최소한의 코드만을 설명합니다. raftify에 대해 궁금하시다면 해당 포스팅을 참고해보세요.

그림 출처: https://github.com/tikv/raft-rs

타입을 중심으로 살펴보는 raft-rs 아키텍처

시나리오를 살펴 보기에 앞서 코드 베이스에서 사용되는 대표적인 타입들을 중심으로 아키텍쳐를 대략적으로 살펴봅시다.

Raft

각 Raft 노드들의 Raft 객체는 메시지 큐 msgs 를 메모리에 들고 있으며 이 큐를 통해 다른 Raft 노드들과 상호작용합니다.

raftify와 같은 하이레벨의 구현체에서 네트워크 계층은 후에 설명할 추상화 계층을 통해 이 큐에 메시지를 넣는 역할을 하게 됩니다.

따라서 이 메시지 큐는 통신의 엔드 포인트로 볼 수 있으며, Raft 구현체는 현재 상태에 따라 이 메시지들을 처리해 나가며 노드 간의 일관된 상태를 유지합니다.

이 Raft 노드의 상태에 해당하는 데이터들을 들고 있는 것이 RaftCore 타입입니다.

또한 다른 Raft 노드들과의 로그 엔트리들을 동기화하기 위한 메타 데이터들을 담는 Progress란 타입이 있으며, 이것들은 상황에 따라 ProgressTracker에서 적절하게 업데이트 됩니다.

결과적으로 Raft는 아래와 같은 타입이 됩니다.

// tikv/raft-rs/blob/master/src/raft.rs pub struct Raft<T: Storage> { pub msgs: Vec<Message>, pub r: RaftCore<T>, prs: ProgressTracker, }

RaftLog

RaftCore가 갖는 대표적인 데이터로 로그 엔트리 시퀸스에 대한 접근을 추상화하는 RaftLog가 있습니다.

RaftLog<T: Storage>UnstableT 타입을 함께 다룰 수 있도록 추상화합니다. 여기서 T는 raftify와 같은 보다 높은 레벨에서 구현해 넣어 주어야 하는 영속적인 스토리지에 해당하며, Unstable은 이 스토리지에 기록되기 전 거치는 버퍼입니다.

// tikv/raft-rs/blob/master/src/raft_log.rs pub struct RaftLog<T: Storage> { pub store: T, pub unstable: Unstable, ... }

💡 RaftCore 타입에 대해 더 궁금하다면 이 링크를 참고하세요.

Raft Loop

Raft 구현체들은 다른 Raft 노드들과 통신하며 일관된 상태를 유지하기 위해 무한 루프를 돌며 자신의 상태 머신을 업데이트 하는 반복적인 프로세스를 수행합니다. 이 글에선 이러한 루프를 Raft loop라고 부르겠습니다.

raftify에서 Raft loop를 구현하는 소스 코드는 아래와 같습니다.

(가장 minimal한 구현을 보고 싶다면 tikv/raft-rs의 예제 코드를 참고하실 수도 있습니다.)

// lablup/raftify/blob/main/raftify/src/raft_node/mod.rs async fn on_ready(&mut self) -> Result<()> { if !self.raw_node.has_ready() { return Ok(()); } let mut ready = self.raw_node.ready(); if !ready.messages().is_empty() { self.send_messages(ready.take_messages()).await; } if *ready.snapshot() != Snapshot::default() { slog::info!( self.logger, "Restoring state machine and snapshot metadata..." ); let snapshot = ready.snapshot(); if !snapshot.get_data().is_empty() { self.fsm.restore(snapshot.get_data().to_vec()).await?; } let store = self.raw_node.mut_store(); store.apply_snapshot(snapshot.clone())?; } self.handle_committed_entries(ready.take_committed_entries()) .await?; if !ready.entries().is_empty() { let entries = &ready.entries()[..]; let store = self.raw_node.mut_store(); store.append(entries)?; } if let Some(hs) = ready.hs() { let store = self.raw_node.mut_store(); store.set_hard_state(hs)?; } if !ready.persisted_messages().is_empty() { self.send_messages(ready.take_persisted_messages()).await; } let mut light_rd = self.raw_node.advance(ready); if let Some(commit) = light_rd.commit_index() { let store = self.raw_node.mut_store(); store.set_hard_state_commit(commit)?; } if !light_rd.messages().is_empty() { self.send_messages(light_rd.take_messages()).await; } self.handle_committed_entries(light_rd.take_committed_entries()) .await?; self.raw_node.advance_apply(); Ok(()) }

RawNode

각 Raft 노드들은 Raft 모듈을 포함하는 RawNode란 타입의 좀 더 하이레벨의 인스턴스를 갖습니다. RawNode는 메모리에만 유지되는 상태인 SoftState, 영속적인 스토리지에 저장되는 상태인 HardState와 아직 저장되지 않은 Ready의 메타 데이터들을 나타내는 records 필드를 갖고 있습니다.

💡 Ready란 Raft 노드를 업데이트 해야 할 필요가 있을 때 갱신되어야 할 데이터들을 한꺼번에 넘겨주는 자료구조입니다.

// tikv/raft-rs/blob/master/src/raw_node.rs pub struct RawNode<T: Storage> { pub raft: Raft<T>, prev_ss: SoftState, prev_hs: HardState, max_number: u64, records: VecDeque<ReadyRecord>, commit_since_index: u64, }

Raft loop의 첫 부분 ready 메서드가 호출될 때 Ready의 메타 데이터가 records에 저장되고, 저장되어야 하는 스냅샷, 엔트리 등이 모두 처리된 후 루프의 마지막 부분인 advance에서 commit_ready를 호출하며 버퍼 Unstable의 오프셋을 갱신합니다.

RaftNode

RaftNode는 raftify에서 RawNode를 네트워크, 스토리지 계층과 통합해 좀 더 높은 레벨에서 추상화하는 타입입니다.

raftify는 별개의 비동기 태스크에서 gRPC 클라이언트에서 보낸 메시지들을 수신하여, 채널을 통해 RaftNode.run 태스크에 이 메시지들을 넘겨줍니다.

메시지를 처리하고 난 후엔 on_ready란 이름의 함수(Raft loop)에서 상태 변경을 처리합니다.

// lablup/raftify/blob/main/raftify/src/raft_node/mod.rs pub async fn run(mut self) -> Result<()> { let mut tick_timer = Duration::from_secs_f32(self.config.tick_interval); let fixed_tick_timer = tick_timer; let mut now = Instant::now(); loop { ... tokio::select! { msg = timeout(fixed_tick_timer, self.server_rcv.recv()) => { if let Ok(Some(msg)) = msg { self.handle_server_request_msg(msg).await?; } } ... } let elapsed = now.elapsed(); now = Instant::now(); if elapsed > tick_timer { tick_timer = Duration::from_millis(100); self.raw_node.tick(); } else { tick_timer -= elapsed; } self.on_ready().await? } }

좀 더 raftify의 구현에 대해 자세히 설명해보자면 raftify는 아래와 같은 과정을 반복 처리합니다.

  1. 클라이언트에서 요청 생성. (예를 들어 RaftServiceClient.proposeRaftNode.propose를 호출)
  2. gRPC를 통해 원격 Raft 노드의 RaftServiceClient.propose가 호출됨.
  3. RaftServiceClient.propose가 채널을 통해 Propose 메시지를 RaftNode.run 코루틴으로 넘김.
  4. 메시지 큐를 폴링하던 RaftNode.runPropose 메시지가 들어오면 RawNode::propose 호출.
  5. 상태 머신에 적용되어야 하는 변경 사항이 생기면 Ready 인스턴스가 생성되어 on_ready 핸들러로 전달됨.
  6. 엔트리가 커밋되면 on_ready 핸들러에서 커밋된 엔트리들을 처리한 후 클라이언트에 응답함.

이론적인 내용들은 이쯤에서 마무리하고 시나리오 몇 개를 분석해보며 어떤 일들이 일어나는지 살펴봅시다.

💡 이 단락에서 Propose 메시지라고 임의로 칭한 것은 클러스터에 상태 변경을 제안하기 위한 목적으로 정의된 타입의 메시지입니다.

시나리오 분석

1 - 새 로그 엔트리 추가

상태 머신을 변경하기 위해 클러스터에 변경 사항을 요청하면 (propose) 내부에서 어떤 일이 일어날까요? 이 섹션에선 RawNode.propose를 호출했을 때 어떤 과정을 거치게 되는지 하나씩 분석해보겠습니다. RawNode.propose 함수를 살펴보면 아래와 같습니다.

// tikv/raft-rs/blob/master/src/raw_node.rs pub fn propose(&mut self, context: Vec<u8>, data: Vec<u8>) -> Result<()> { let mut m = Message::default(); m.set_msg_type(MessageType::MsgPropose); m.from = self.raft.id; let mut e = Entry::default(); e.data = data.into(); e.context = context.into(); m.set_entries(vec![e].into()); self.raft.step(m) }

위 코드를 통해 propose 함수는 step을 호출해 MsgPropose 타입의 메시지를 처리하도록 만드는 것을 알 수 있습니다.

여기서 step은 raft-rs의 실질적인 메시지 핸들러에 해당하는 함수입니다. step을 호출한 노드가 리더인 경우 step_leader, 팔로워인 경우 step_follower, 후보자인 경우 step_candidate가 호출됩니다.

step의 코드는 매우 복잡하지만 여기선 리더 노드에서 MsgPropose 타입이 어떻게 처리되는지 코드를 따라가며 살펴봅시다.

// tikv/raft-rs/blob/master/src/raft.rs fn step_leader(&mut self, mut m: Message) -> Result<()> { ... match m.get_msg_type() { MessageType::MsgPropose => { ... if !self.append_entry(m.mut_entries()) { ... } self.bcast_append(); return Ok(()); } ... } }

Raft.append_entryRaftLog.append를 호출해 엔트리들을 추가합니다. RaftLog.appendself.unstable.truncate_and_append에서 Unstable 버퍼에 추가된 엔트리들을 append 합니다.

// tikv/raft-rs/blob/master/src/raft_log.rs pub fn append(&mut self, ents: &[Entry]) -> u64 { ... self.unstable.truncate_and_append(ents); self.last_index() }

이전에 설명한 것과 같이 버퍼에 추가된 엔트리들은 Raft loop에서 persist 되고, advance류 함수를 통해 상태 머신을 갱신해 주면 자동으로 오프셋이 갱신되면서 버퍼가 클리어 될 것입니다.

그 다음으로 호출되는 bcast_append에 대해 살펴보도록 하겠습니다.

이전 섹션에서 설명한, 리더와 팔로워들의 로그 엔트리들을 동기화하기 위한 ProgressTracker (prs)를 통해 각 팔로워의 progress를 인자로 core.send_append를 호출하는 것을 볼 수 있습니다.

// tikv/raft-rs/blob/master/src/raft.rs pub fn bcast_append(&mut self) { let self_id = self.id; let core = &mut self.r; let msgs = &mut self.msgs; self.prs .iter_mut() .filter(|&(id, _)| *id != self_id) .for_each(|(id, pr)| core.send_append(*id, pr, msgs)); }

send_append는 아래와 같은 간략한 구조를 갖고 있습니다.

// tikv/raft-rs/blob/master/src/raft.rs fn send_append(&mut self, to: u64, pr: &mut Progress, msgs: &mut Vec<Message>) { self.maybe_send_append(to, pr, true, msgs); }

maybe_send_appendRaftLog.entries를 통해 pr.next_idx ~ to 범위의 로그 엔트리들을 읽어온 후 prepare_send_entries에 넘겨줍니다.

(maybe_란 이름의 접두사를 통해 유추해 볼 수 있듯이 해당 함수는 성공하면 true, 실패하면 false를 리턴합니다.)

// tikv/raft-rs/blob/master/src/raft.rs fn maybe_send_append( &mut self, to: u64, pr: &mut Progress, allow_empty: bool, msgs: &mut Vec<Message>, ) -> bool { ... let ents = self.raft_log.entries( pr.next_idx, self.max_msg_size, GetEntriesContext(GetEntriesFor::SendAppend { to, term: self.term, aggressively: !allow_empty, }), ); ... match (term, ents) { (Ok(term), Ok(mut ents)) => { if self.batch_append && self.try_batching(to, msgs, pr, &mut ents) { return true; } self.prepare_send_entries(&mut m, pr, term, ents) } ... } ... self.send(m, msgs); true }

prepare_send_entries는 메시지 객체 m을 MsgAppend 타입으로 만들고 엔트리들을 메시지에 넣어줍니다. 그 후 progress를 업데이트 해 준 후 리턴합니다.

// tikv/raft-rs/blob/master/src/raft.rs fn prepare_send_entries( &mut self, m: &mut Message, pr: &mut Progress, term: u64, ents: Vec<Entry>, ) { m.set_msg_type(MessageType::MsgAppend); m.index = pr.next_idx - 1; m.log_term = term; m.set_entries(ents.into()); m.commit = self.raft_log.committed; if !m.entries.is_empty() { let last = m.entries.last().unwrap().index; pr.update_state(last); } }

그리고 self.send(m, msgs)에서 이 준비한 메시지를 msgs 메시지 큐에 넣어 줍니다.

// tikv/raft-rs/blob/master/src/raft.rs fn send(&mut self, mut m: Message, msgs: &mut Vec<Message>) { ... msgs.push(m); }

메시지 큐에 들어간 MsgAppend 메시지는 네트워크 계층을 통해 send_messages에서 팔로워 노드로 전송되게 됩니다. 따라서, 우리는 팔로워 노드가 MsgAppend 메시지를 받은 후 어떻게 처리하는지를 봐야 합니다.

다음으로 팔로워 노드에서 일어나는 일을 살펴보면 아래와 같습니다. 팔로워 노드에서 MsgAppend 메시지를 수신했을 때 일어나는 일을 알아보려면 step_follower를 보면 됩니다.

// tikv/raft-rs/blob/master/src/raft.rs fn step_follower(&mut self, mut m: Message) -> Result<()> { match m.get_msg_type() { ... MessageType::MsgAppend => { self.election_elapsed = 0; self.leader_id = m.from; self.handle_append_entries(&m); } ... } }

위 코드를 통해 MsgAppend 메시지를 수신한 팔로워 노드가 handle_append_entries를 호출하고 있는 것을 알 수 있습니다.

이 함수는 아래처럼 MsgAppendResponse 타입의 메시지인 to_send를 만들고 RaftLog.maybe_append를 호출합니다.

// tikv/raft-rs/blob/master/src/raft.rs pub fn handle_append_entries(&mut self, m: &Message) { ... let mut to_send = Message::default(); to_send.to = m.from; to_send.set_msg_type(MessageType::MsgAppendResponse); if let Some((_, last_idx)) = self .raft_log .maybe_append(m.index, m.log_term, m.commit, &m.entries) { ... // MsgAppend 메시지를 수신 } else { ... // MsgAppend 메시지를 거절 } ... self.r.send(to_send, &mut self.msgs); }

이 함수는 아래처럼 match_term을 호출해 메시지의 logTerm과 로그 엔트리의 term 값이 같은지 확인하고, find_conflict를 호출해 로그 엔트리 시퀸스에 충돌이 있는지 검사한 후 문제가 없다고 판단하면 Raft.append를 호출합니다.

// tikv/raft-rs/blob/master/src/raft.rs pub fn maybe_append( &mut self, idx: u64, term: u64, committed: u64, ents: &[Entry], ) -> Option<(u64, u64)> { if self.match_term(idx, term) { let conflict_idx = self.find_conflict(ents); if conflict_idx == 0 { } else if conflict_idx <= self.committed { fatal!( self.unstable.logger, "entry {} conflict with committed entry {}", conflict_idx, self.committed ) } else { let start = (conflict_idx - (idx + 1)) as usize; self.append(&ents[start..]); if self.persisted > conflict_idx - 1 { self.persisted = conflict_idx - 1; } } let last_new_index = idx + ents.len() as u64; self.commit_to(cmp::min(committed, last_new_index)); return Some((conflict_idx, last_new_index)); } None }

우리는 이 함수를 본 적이 있습니다. 리더 노드에서 로그 엔트리가 제안되었을 때 RaftLog.append의 호출 전 마지막으로 호출된 함수였죠.

이전과 마찬가지로 Raft.append_entryRaftLog.append를 호출해 엔트리들을 추가합니다. RaftLog.appendself.unstable.truncate_and_append에서 Unstable 버퍼에 추가된 엔트리들을 append 합니다.

이것으로 리더에 추가된 로그가 리더 노드에 persist 되고 팔로워 노드에 복사되는 시나리오를 간략하게 알아보았습니다.

2 - 리더와 팔로워 노드 로그 시퀀스 불일치 시

우리는 시나리오 1에서 정상적인 상황을 가정하고 코드를 들여다보았습니다. 하지만 실제로는 네트워크 단절 등의 이슈로 리더 노드와 팔로워 노드에 불일치가 생길 수 있습니다. 이번엔 리더 노드와 팔로워 노드 사이에 불일치가 생겼을 때 이를 어떻게 감지하고 해소하는지를 중심으로 다시 한번 코드를 들여다보겠습니다.

3개의 노드로 이루어진 클러스터가 연속해서 상태 머신을 변경하는 수천 개의 요청을 처리하다가 네트워크 장애가 발생했다고 가정해봅시다.

장애가 발생한 경우 코드부터 보는 게 아니라 우선 노드들에 출력된 로그들과 persist된 로그 엔트리들, 디버깅 정보들을 먼저 들여다 보며 맥락을 파악하는 것부터 시작해야 하지만, 글이 지나치게 장황해지는 것을 피하기 위해 노드들에 어떤 일들이 발생하고 있는지 대략적으로 파악하게 해 줄 로그만 골라서 분석해보겠습니다.

우선 3번 노드에선 메시지를 reject 했음을 나타내는 rejected msgApp... 로그를 남기고 있습니다.

Nov 28 05:30:59.233 DEBG rejected msgApp [logterm: 7, index: 3641] from 2, logterm: Ok(0), index: 3641, from: 2, msg_index: 3641, msg_log_term: 7

위 로그를 통해 3번 노드는 팔로워 노드, 2번 노드가 장애가 발생한 후 새로 선출된 리더 노드이며 3641 번째 엔트리를 복제하려는 MsgAppend 메시지가 거절되었다는 것을 알 수 있습니다.

이 로그가 어떤 함수에서 출력된 것인지 찾아보면, 시나리오 1에서 살펴 보았었던 handle_append_entries에서 호출하는 것을 알 수 있는데요. (팔로워가 리더로부터 받은 MsgAppend 메시지를 처리하는 함수)

pub fn handle_append_entries(&mut self, m: &Message) { ... let mut to_send = Message::default(); to_send.to = m.from; to_send.set_msg_type(MessageType::MsgAppendResponse); ... if let Some((_, last_idx)) = self .raft_log .maybe_append(m.index, m.log_term, m.commit, &m.entries) { ... } else { debug!( self.logger, "rejected msgApp [logterm: {msg_log_term}, index: {msg_index}] \ from {from}", msg_log_term = m.log_term, msg_index = m.index, from = m.from; "index" => m.index, "logterm" => ?self.raft_log.term(m.index), ); let hint_index = cmp::min(m.index, self.raft_log.last_index()); let (hint_index, hint_term) = self.raft_log.find_conflict_by_term(hint_index, m.log_term); if hint_term.is_none() { fatal!( self.logger, "term({index}) must be valid", index = hint_index ) } to_send.index = m.index; to_send.reject = true; to_send.reject_hint = hint_index; to_send.log_term = hint_term.unwrap(); } to_send.set_commit(self.raft_log.committed); self.r.send(to_send, &mut self.msgs); }

함수를 살펴보면 이 로그가 출력되었다는 것에서 maybe_append가 None을 리턴했다는 것, 즉 match_term이 False를 반환했다는 것을 알 수 있습니다. 이것은 메시지의 logTerm과 3641번 엔트리의 term 값에 불일치가 발생했다는 것을 의미합니다.

따라서 term을 통해 충돌한 지점을 찾고 (find_conflict_by_term) 충돌한 지점(hint_index)을 메시지의 reject_hint에 넣어 리더에 MsgAppendResponse 메시지 형태로 되돌려 줍니다.

그럼 리더는 이 거절된 MsgAppendResponse 메시지를 어떻게 처리할까요?

메시지를 거절한 리더 노드는 아래와 같은 MsgAppend가 거절했다는 로그를 남기게 됩니다.

Nov 28 05:30:59.279 DEBG received msgAppend rejection, index: 3641, from: 3, reject_hint_term: 7, reject_hint_index: 3611

따라서 우리가 그 다음으로 들여다 보아야 하는 것은 이 거절된 MsgAppend 메시지를 받은 후 "received msgAppend rejection"를 출력하는 함수입니다.

이 함수는 handle_append_response인데요, 함수 자체는 꽤 길지만 MsgAppend가 reject 되었을 때의 처리만 잘라놓고 보면 그리 길지 않습니다.

fn handle_append_response(&mut self, m: &Message) { let mut next_probe_index: u64 = m.reject_hint; ... if m.reject { debug!( self.r.logger, "received msgAppend rejection"; "reject_hint_index" => m.reject_hint, "reject_hint_term" => m.log_term, "from" => m.from, "index" => m.index, ); if pr.maybe_decr_to(m.index, next_probe_index, m.request_snapshot) { debug!( self.r.logger, "decreased progress of {}", m.from; "progress" => ?pr, ); if pr.state == ProgressState::Replicate { pr.become_probe(); } self.send_append(m.from); } return; } ... }

메시지의 reject_hint를 가져와 next_probe_index로 만들고, Progress.maybe_decr_to를 호출해 progress를 감소시킵니다. Progress가 Probe 상태임을 표시하고, send_append를 호출해 다시 MsgAppend 메시지를 보내줍니다.

💡 ProgressState는 각 노드들의 동기화 진행 상태를 나타내는 enum 입니다. 정상적인 상황, 로그를 복제하고 있는 상태에선 "Replicate" 이며, 복제된 마지막 인덱스를 모르고 있는 팔로워 노드는 조사 중인 상태란 의미로 "Probe", 스냅샷 전송을 통해 팔로워에 로그를 복제 중인 경우 "Snapshot"입니다.

요약하자면 충돌이 발생하기 전 로그 엔트리의 인덱스 (next_probe_index)를 찾기 위해 해당 노드의 progress를 감소시키고 다시 MsgAppend 메시지를 보낸다는 것입니다. 이 과정은 리더와 팔로워 노드의 Common log prefix를 찾게 될 때까지 반복됩니다.

Common log prefix를 찾게 되면, 해당 인덱스 이후의 로그 엔트리들은 리더로부터 팔로워로 단방향으로 복제되어 덮어쓰게 됩니다. 이 과정은 maybe_send_append 함수에서 확인할 수 있습니다.

아래와 같이 RaftLog.entries를 통해 얻어진 로그 엔트리들이 SendAppend 컨텍스트로 복제됩니다. 이때 max_msg_size는 Config의 max_size_per_msg이며, 이 값의 디폴트 값은 0입니다. RaftLog.entries를 통해 LMDBStorage.entries의 (RaftLog의 T에 해당하는 persistent 스토리지 타입) 인자의 max_size0이 주어지는데, 이 주석을 토대로 생각해보면 이것의 의미는 따로 설정해 주지 않으면 리더와 팔로워 노드의 로그에 불일치가 발생했을 때 로그 엔트리를 한 개씩 동기화하란 의미임을 알 수 있습니다.

이후엔 이전 섹션에서 설명한 것과 같이 prepare_send_entries를 통해 MsgAppend 메시지를 준비하고, Raft.send를 통해 팔로워 노드로 엔트리를 복제하기 위한 메시지가 전달됩니다.

// tikv/raft-rs/blob/master/src/raft.rs fn maybe_send_append( &mut self, to: u64, pr: &mut Progress, allow_empty: bool, msgs: &mut Vec<Message>, ) -> bool { ... let mut m = Message::default(); m.to = to; if pr.pending_request_snapshot != INVALID_INDEX { ... } else { let ents = self.raft_log.entries( pr.next_idx, self.max_msg_size, GetEntriesContext(GetEntriesFor::SendAppend { to, term: self.term, aggressively: !allow_empty, }), ); ... let term = self.raft_log.term(pr.next_idx - 1); match (term, ents) { (Ok(term), Ok(mut ents)) => { if self.batch_append && self.try_batching(to, msgs, pr, &mut ents) { return true; } self.prepare_send_entries(&mut m, pr, term, ents) } ... } } self.send(m, msgs); true }

중간에 많은 로그들이 생략되어 있지만 리더와 팔로워 사이에 3612 번째 엔트리부터 3642번째 엔트리 까지 위와 같은 과정을 거쳐 동기화가 일어난 후, 팔로워 노드로의 복제가 모두 끝나면 해당 팔로워의 progress stateReplicate로 변하며, 정상적으로 Heartbeat 메시지를 주고 받기 시작하는 것을 알 수 있습니다.

2023-11-28 14:30:59,269 - INFO - Entries [3612, 3643) requested Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3611, entries: [Entry { context: "1810", data: "{'key': '2292', 'value': '1'}", entry_type: EntryNormal, index: 3612, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2 2023-11-28 14:30:59,259 - INFO - Entries [3613, 3643) requested Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3612, entries: [Entry { context: "1811", data: "{'key': '2294', 'value': '1'}", entry_type: EntryNormal, index: 3613, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2 2023-11-28 14:30:59,259 - INFO - Entries [3614, 3643) requested Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3613, entries: [Entry { context: "1812", data: "{'key': '2295', 'value': '1'}", entry_type: EntryNormal, index: 3614, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2 2023-11-28 14:30:59,259 - INFO - Entries [3615, 3643) requested Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3614, entries: [Entry { context: "1813", data: "{'key': '2296', 'value': '1'}", entry_type: EntryNormal, index: 3615, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2 ... 2023-11-28 14:30:59,284 - INFO - Entries [3641, 3643) requested Nov 28 05:30:59.283 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3640, entries: [Entry { context: "1839", data: "{'key': '2457', 'value': '1'}", entry_type: EntryNormal, index: 3641, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2 2023-11-28 14:30:59,284 - INFO - Entries [3642, 3643) requested Nov 28 05:30:59.284 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3641, entries: [Entry { context: "None", data: "None", entry_type: EntryNormal, index: 3642, sync_log: false, term: 12 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2 Nov 28 05:31:01.635 DEBG Sending from 2 to 1, msg: Message { msg_type: MsgHeartbeat, to: 1, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 1, from: 2 Nov 28 05:31:01.635 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgHeartbeat, to: 3, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2 2023-11-28 14:31:01,637

3 - 리더 선출

시나리오 2에서 네트워크 장애로 인해 리더 선출이 일어났었던 것을 term 값의 증가를 통해 확인할 수 있었는데요, 이 시나리오에선 이 리더 선출 과정에 대해 자세히 들여다보도록 하겠습니다.

리더에 장애가 생긴 경우 어떤 로그들이 찍히게 되는지 확인하기 위해 간단하게 3개의 노드로 이뤄진 클러스터를 만들고 리더 프로세스를 강제로 종료시켜 본 후 새로 리더로 선출되는 프로세스의 로그를 들여다보겠습니다.

로그의 내용을 요약해보자면 리더 노드가 종료된 후, 3번 노드에서 선거를 시작하고 후보자(Candidate) 상태로 전이한 후 다른 voter들에게 MsgRequestVote 메시지를 보냅니다. 2번 노드로부터 MsgRequestVoteResponse 메시지를 받고, 자신은 본인에게 투표하기 때문에 과반수 이상의 투표를 받게 되어 새 리더로 선출된 후 term 값을 2로 증가시키고 자신이 리더로 선출되었음을 알리기 위한 특수한 종류의 메시지(Empty MsgAppend)를 보내는 과정이라고 요약할 수 있습니다.

💡 election_tick 만큼 heartbeat 메시지를 받지 못한 팔로워 노드가 투표를 시작하게 됩니다. 이 때 투표 분열(Split vote)을 방지하기 위해 election_tick은 매번 min_election_tick ~ max_election_tick 사이에서 무작위 값으로 결정됩니다. 따라서 리더 노드가 종료된 후 나머지 두 노드들 중 어떤 노드라도 리더 노드가 될 수 있으며 이는 더 작은 election_tick을 가진 노드로 선출됩니다.

Nov 29 01:30:30.210 INFO starting a new election, term: 1 Nov 29 01:30:30.210 DEBG reset election timeout 16 -> 10 at 0, election_elapsed: 0, timeout: 10, prev_timeout: 16 Nov 29 01:30:30.210 INFO became candidate at term 2, term: 2 Nov 29 01:30:30.210 DEBG Sending from 3 to 1, msg: Message { msg_type: MsgRequestVote, to: 1, from: 0, term: 2, log_term: 1, index: 3, entries: [], commit: 3, commit_term: 1, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 1, from: 3 Nov 29 01:30:30.210 DEBG Sending from 3 to 2, msg: Message { msg_type: MsgRequestVote, to: 2, from: 0, term: 2, log_term: 1, index: 3, entries: [], commit: 3, commit_term: 1, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 2, from: 3 Nov 29 01:30:30.211 INFO broadcasting vote request, to: [1, 2], log_index: 3, log_term: 1, term: 2, type: MsgRequestVote 2023-11-29 10:30:30,217 - WARNING - Failed to connect to node 1 elapsed from first failure: 0.0000s. Err message: <AioRpcError of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:60061: Failed to connect to remote host: Connection refused" debug_error_string = "UNKNOWN:failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:60061: Failed to connect to remote host: Connection refused {created_time:"2023-11-29T10:30:30.216855+09:00", grpc_status:14}" > 2023-11-29 10:30:30,222 - DEBUG - Node 3 received Raft message from the node 2, Message: Message { msg_type: MsgRequestVoteResponse, to: 3, from: 2, term: 2, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: "None", metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 0, term: 0 }) }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 } Nov 29 01:30:30.223 INFO received votes response, term: 2, type: MsgRequestVoteResponse, approvals: 2, rejections: 0, from: 2, vote: true Nov 29 01:30:30.223 TRCE ENTER become_leader Nov 29 01:30:30.223 DEBG reset election timeout 10 -> 17 at 0, election_elapsed: 0, timeout: 17, prev_timeout: 10 Nov 29 01:30:30.223 TRCE Entries being appended to unstable list, ents: Entry { context: "None", data: "None", entry_type: EntryNormal, index: 4, sync_log: false, term: 2 } Nov 29 01:30:30.223 INFO became leader at term 2, term: 2 Nov 29 01:30:30.223 TRCE EXIT become_leader Nov 29 01:30:30.223 DEBG Sending from 3 to 1, msg: Message { msg_type: MsgAppend, to: 1, from: 0, term: 0, log_term: 1, index: 3, entries: [Entry { context: "None", data: "None", entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }], commit: 3, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 1, from: 3 Nov 29 01:30:30.223 DEBG Sending from 3 to 2, msg: Message { msg_type: MsgAppend, to: 2, from: 0, term: 0, log_term: 1, index: 3, entries: [Entry { context: "None", data: "None", entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }], commit: 3, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 2, from: 3

그럼 이제 로그 내용을 바탕으로 코드에서 어떤 일들이 벌어지고 있는지 알아 봅시다.

우선 "starting a new election" 이라는 로그를 출력하고 있는 함수는 hup 입니다.

hupstepMsgHup 타입, step_followerMsgTimeoutNow 타입의 메시지에 대한 처리 과정에서 호출됩니다.

여기서 MsgTimeoutNow 메시지는 Leader election이 아닌, Leader transfer에 사용되는 메시지 타입입니다. 즉 리더가 MsgTransferLeader 메시지를 받게 되면 팔로워들에게 MsgTimeoutNow 타입의 메시지를 전송하게 되고 transfer_leader 플래그를 True로 둔 채 hup 함수가 실행되게 됩니다. Leader election은 리더 장애 등의 상황으로 리더를 새로 선출하는 과정이지만, Leader transfer은 리더 프로세스가 다른 팔로워 프로세스에게 리더를 양도하는 과정입니다.

그러므로 우리가 지금 따라가보아야 하는 메시지는 MsgHup 임을 알 수 있습니다. election_tick이 지났는데도 Heartbeat를 받지 못했기 때문에 리더 선출을 시작했다는 점을 통해 MsgHup 메시지를 넣어준 것이 아래 tick_election 함수인 것을 추측해 볼 수 있습니다.

RaftNode에서 tick_timer마다 self.raw_node.tick()을 호출했던 것을 기억하시나요? 이 RawNode.tick을 통해 노드가 election_elapsedrandomized_election_timeout를 경과한 경우 자기 자신에게 MsgHup 메시지를 step 하게 되는 것입니다. (여기서 election_elapsed을 랜덤화하는 것은 모든 노드가 동시에 투표를 시작해 모든 노드가 자기 자신에게 투표하는 상황을 방지하기 위한 것입니다.)

// raw_node.rs pub fn tick(&mut self) -> bool { self.raft.tick() } // raft.rs pub fn tick(&mut self) -> bool { match self.state { StateRole::Follower | StateRole::PreCandidate | StateRole::Candidate => { self.tick_election() } StateRole::Leader => self.tick_heartbeat(), } } // raft.rs pub fn tick_election(&mut self) -> bool { self.election_elapsed += 1; if !self.pass_election_timeout() || !self.promotable { return false; } self.election_elapsed = 0; let m = new_message(INVALID_ID, MessageType::MsgHup, Some(self.id)); let _ = self.step(m); true } // raft.rs pub fn step(&mut self, m: Message) -> Result<()> { ... match m.get_msg_type() { ... MessageType::MsgHup => { self.hup(false) }, } } // raft.rs pub fn pass_election_timeout(&self) -> bool { self.election_elapsed >= self.randomized_election_timeout }

hup 함수는 간단하게 요약해보자면 아래처럼 campaign 함수를 CAMPAIGN_ELECTION 타입으로 실행합니다.

// tikv/raft-rs/blob/master/src/raft.rs fn hup(&mut self, transfer_leader: bool) { ... info!( self.logger, "starting a new election"; "term" => self.term, ); ... self.campaign(CAMPAIGN_ELECTION); }

campaign 함수는 아래처럼 자신의 상태를 Candidate 상태로 전이시킨 후 투표를 시작합니다.

우선 self_id는 이름대로 노드 자신의 id입니다. 따라서 self.poll(self_id, vote_msg, true)는 자기 자신에게 투표한다는 의미입니다.

이 결과가 VoteResult::Won인 경우 그대로 투표에서 승리하며 노드 본인이 리더가 되고 리턴합니다.

따라서 MsgRequestVote, MsgRequestVoteResponse 등의 메시지는 싱글 노드 클러스터에서 오고 가지 않을 것임을 알 수 있습니다.

하지만 물론 이 시나리오는 싱글 노드 클러스터가 아니기 때문에 경우에 해당하지 않습니다.

// tikv/raft-rs/blob/master/src/raft.rs pub fn campaign(&mut self, campaign_type: &'static [u8]) { let (vote_msg, term) = if campaign_type == CAMPAIGN_PRE_ELECTION { ... } else { self.become_candidate(); (MessageType::MsgRequestVote, self.term) }; let self_id = self.id; if VoteResult::Won == self.poll(self_id, vote_msg, true) { // We won the election after voting for ourselves (which must mean that // this is a single-node cluster). return; } ... }

campaign의 뒷부분을 더 들여다보기 전에 poll은 어떻게 동작하는 것인지 알아봅시다.

poll은 아래처럼 record_vote, tally_votes를 호출하는 함수이며, 투표 결과에 따라 투표에서 승리했다면 리더 노드로 전이한 후, 자신이 클러스터의 새 리더라는 것을 브로드캐스팅 (bcast_append) 합니다.

투표에서 진 경우 팔로워 노드로 전이하며, 결과가 Pending인 경우 아무일도 수행하지 않고 리턴합니다.

// tikv/raft-rs/blob/master/src/raft.rs fn poll(&mut self, from: u64, t: MessageType, vote: bool) -> VoteResult { self.prs.record_vote(from, vote); let (gr, rj, res) = self.prs.tally_votes(); if from != self.id { info!( self.logger, "received votes response"; "vote" => vote, "from" => from, "rejections" => rj, "approvals" => gr, "type" => ?t, "term" => self.term, ); } match res { VoteResult::Won => { if self.state == StateRole::PreCandidate { self.campaign(CAMPAIGN_ELECTION); } else { self.become_leader(); self.bcast_append(); } } VoteResult::Lost => { let term = self.term; self.become_follower(term, INVALID_ID); } VoteResult::Pending => (), } res }

record_vote의 역할은 아주 단순합니다. id 값을 가진 노드가 자기 자신에게 투표했을 때 ProgressTracker의 해시맵 객체 votes에 기록하는 함수입니다.

// tikv/raft-rs/blob/master/src/tracker.rs pub fn record_vote(&mut self, id: u64, vote: bool) { self.votes.entry(id).or_insert(vote); }

tally_votes를 봅시다. 해시맵 votes를 통해 자기 자신에게 투표한 노드의 수와 거절한 노드의 수를 세서 튜플 형태로 리턴해주고 있는 것을 볼 수 있습니다.

💡 "tally"라는 단어는 점수를 세거나 집계하는 행위를 의미합니다. 즉 "tally_votes"는 투표를 세서 집계하는 함수입니다.

// tikv/raft-rs/blob/master/src/tracker.rs pub fn tally_votes(&self) -> (usize, usize, VoteResult) { let (mut granted, mut rejected) = (0, 0); for (id, vote) in &self.votes { if !self.conf.voters.contains(*id) { continue; } if *vote { granted += 1; } else { rejected += 1; } } let result = self.vote_result(&self.votes); (granted, rejected, result) }

투표 결과를 어떻게 판단하는지 들여다볼까요?

조인트 쿼럼의 경우 두 쿼럼 (Incoming quorum, Outgoing quorum)의 동의를 모두 얻어야 투표에서 승리할 수 있습니다.

따라서 우리는 아래 세 vote_result 함수를 들여다봐야 합니다.

tracker.rs에선 해시맵 votes를 통해 노드 id가 자신에게 투표했는지 알 수 있게 해 주는 콜백 함수 check를 인자로 넘겨 줍니다.

joint.rs에선 두 구성에서 모두 승리한 경우에만 VoteResult::Won를 리턴하고, 한 쪽에서라도 투표에서 졌다면 VoteResult::Lost를 리턴합니다. 그 외의 경우인 경우 VoteResult::Pending를 리턴합니다.

득표 수를 실제로 카운트하는 작업은 majority.rsvote_result에서 진행됩니다.

클러스터의 voter들 중 자기 자신에게 투표한 노드의 수와 투표하지 않은 노드의 수를 세서 과반수보다 많은 노드들이 동의한 경우 VoteResult::Won, 과반수 이상의 투표를 얻지 못했지만 응답을 보내주지 못한 노드까지 포함시켰을 때 과반수를 넘는다면 VoteResult::Pending, 그 이외의 경우 VoteResult::Lost를 반환합니다.

// tracker.rs pub fn vote_result(&self, votes: &HashMap<u64, bool>) -> VoteResult { self.conf.voters.vote_result(|id| votes.get(&id).cloned()) } // joint.rs pub fn vote_result(&self, check: impl Fn(u64) -> Option<bool>) -> VoteResult { let i = self.incoming.vote_result(&check); let o = self.outgoing.vote_result(check); match (i, o) { // It won if won in both. (VoteResult::Won, VoteResult::Won) => VoteResult::Won, // It lost if lost in either. (VoteResult::Lost, _) | (_, VoteResult::Lost) => VoteResult::Lost, // It remains pending if pending in both or just won in one side. _ => VoteResult::Pending, } } // majority.rs pub fn vote_result(&self, check: impl Fn(u64) -> Option<bool>) -> VoteResult { ... let (mut yes, mut missing) = (0, 0); for v in &self.voters { match check(*v) { Some(true) => yes += 1, None => missing += 1, _ => (), } } let q = crate::majority(self.voters.len()); if yes >= q { VoteResult::Won } else if yes + missing >= q { VoteResult::Pending } else { VoteResult::Lost } } // util.rs pub fn majority(total: usize) -> usize { (total / 2) + 1 }

투표 과정이 votes 해시맵을 기반으로 어떻게 진행되는지 살펴보았습니다. 하지만 이 과정을 밟기 전 MsgRequestVote, MsgRequestVoteResponse 메시지를 통해 이 해시맵이 적절하게 업데이트 되어야 합니다.

따라서 campaign 함수를 계속 따라가 보도록 합시다.

campaign 함수가 MsgRequestVote 타입의 메시지를 만들어 voter들에게 전송하고 있다는 것을 알 수 있습니다.

따라서 그 다음으론 MsgRequestVote 메시지의 핸들러를 따라가 봅시다.

// tikv/raft-rs/blob/master/src/raft.rs pub fn campaign(&mut self, campaign_type: &'static [u8]) { let (vote_msg, term) = if campaign_type == CAMPAIGN_PRE_ELECTION { ... } else { self.become_candidate(); (MessageType::MsgRequestVote, self.term) }; let self_id = self.id; if VoteResult::Won == self.poll(self_id, vote_msg, true) { // We won the election after voting for ourselves (which must mean that // this is a single-node cluster). return; } // Only send vote request to voters. for id in self.prs.conf().voters().ids().iter() { if id == self_id { continue; } ... let mut m = new_message(id, vote_msg, None); m.term = term; m.index = self.raft_log.last_index(); m.log_term = self.raft_log.last_term(); m.commit = commit; m.commit_term = commit_term; ... self.r.send(m, &mut self.msgs); } ... }

얼핏 보면 복잡해 보이지만 결국 MsgRequestVote의 핸들러가 하는 일은 이 투표에 동의하거나 동의하지 않는다는 메시지를 만들어 전송해주는 일입니다.

vote_resp_msg_type에 따라 우리가 보낸 타입은 MsgRequestVote이므로 응답 메시지의 타입은 MsgRequestVoteResponse가 될 것입니다. (이 글에선 prevote 알고리즘에 대한 설명은 생략합니다)

그럼 노드가 언제 투표에 동의하고 언제 동의하지 않는지 살펴봅시다. 주석과 함께 코드를 찬찬히 살펴보면 투표에 동의하기 위해선 아래 세 조건이 만족되어야 함을 알 수 있습니다.

  1. can_votetrue (이미 해당 노드에 투표한 경우이거나, 이번 term에서의 leader_id를 모르고 아직 투표 하지 않은 경우)

  2. self.raft_log.is_up_to_datetrue (메시지의 term 값이 RaftLog.last_term 보다 크거나, 만약 같다면 RaftLog.last_index보다 메시지의 인덱스가 큰 경우)

  3. 메시지의 인덱스가 RaftLog.last_index 보다 크거나, 더 높은 우선 순위를 갖는 경우

이 세 조건이 만족된 경우 Vote에 동의하며 만족되지 않는 조건이 있다면 Vote를 거절한다는 메시지를 보냅니다.

그럼 이제 MsgRequestVoteResponse의 수신부로 넘어가봅시다.

// raft.rs pub fn step(&mut self, m: Message) -> Result<()> { ... match m.get_msg_type() { MessageType::MsgRequestVote => { // We can vote if this is a repeat of a vote we've already cast... let can_vote = (self.vote == m.from) || // ...we haven't voted and we don't think there's a leader yet in this term... (self.vote == INVALID_ID && self.leader_id == INVALID_ID) // ...and we believe the candidate is up to date. if can_vote && self.raft_log.is_up_to_date(m.index, m.log_term) && (m.index > self.raft_log.last_index() || self.priority <= get_priority(&m)) { self.log_vote_approve(&m); let mut to_send = new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None); to_send.reject = false; to_send.term = m.term; self.r.send(to_send, &mut self.msgs); if m.get_msg_type() == MessageType::MsgRequestVote { // Only record real votes. self.election_elapsed = 0; self.vote = m.from; } } else { self.log_vote_reject(&m); let mut to_send = new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None); to_send.reject = true; to_send.term = self.term; let (commit, commit_term) = self.raft_log.commit_info(); to_send.commit = commit; to_send.commit_term = commit_term; self.r.send(to_send, &mut self.msgs); self.maybe_commit_by_vote(&m); } } } } // raft.rs pub fn vote_resp_msg_type(t: MessageType) -> MessageType { match t { MessageType::MsgRequestVote => MessageType::MsgRequestVoteResponse, MessageType::MsgRequestPreVote => MessageType::MsgRequestPreVoteResponse, _ => panic!("Not a vote message: {:?}", t), } } // raft_log.rs pub fn is_up_to_date(&self, last_index: u64, term: u64) -> bool { term > self.last_term() || (term == self.last_term() && last_index >= self.last_index()) }

MsgRequestVoteResponse 메시지 핸들러는 매우 단순합니다!

우리가 아까 봤었던 poll 함수를 호출하여 votes 해시맵을 업데이트 하고 투표 결과가 결정된 경우 StateRole을 업데이트 합니다.

// tikv/raft-rs/blob/master/src/raft.rs fn step_candidate(&mut self, m: Message) -> Result<()> { match m.get_msg_type() { ... MessageType::MsgRequestVoteResponse => { ... self.poll(m.from, m.get_msg_type(), !m.reject); self.maybe_commit_by_vote(&m); } } }

정리

이 글에선 raft-rs에서 사용되는 타입들을 바탕으로 코드 아키텍쳐를 살펴본 후, 세 가지 기초적인 시나리오를 바탕으로 raft 구현체의 코드를 따라가며 분석해보았습니다. 이 글이 raft 모듈에 대한 이해를 넓히는데 도움이 되었기를 바랍니다. 다음 글에선 좀 더 다양한 시나리오들을 통해 raft 구현체의 작동 방식을 보다 깊이 살펴보도록 하겠습니다.

감사합니다 😊

도움이 필요하신가요?

내용을 작성해 주시면 곧 연락 드리겠습니다.

문의하기

Headquarter & HPC Lab

서울특별시 강남구 선릉로100길 34 남영빌딩 4층, 5층

© Lablup Inc. All rights reserved.