d2d_rendezvous.rs 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. //! Example for usage of the Connection Rendezvous Protocol state machine, using MPSC channels to
  2. //! simulate paths.
  3. #![expect(unused_crate_dependencies, reason = "Example triggered false positive")]
  4. use core::time::Duration;
  5. use std::{
  6. sync::mpsc::{self, RecvTimeoutError},
  7. thread,
  8. time::Instant,
  9. };
  10. use anyhow::Context as _;
  11. use data_encoding::HEXLOWER;
  12. use libthreema::{
  13. d2d_rendezvous::{
  14. AuthenticationKey, OutgoingFrame, PathProcessResult, PathStateUpdate, RendezvousProtocol,
  15. },
  16. utils::logging::init_stderr_logging,
  17. };
  18. use tracing::{Level, info, trace, trace_span, warn};
  19. struct Keys;
  20. impl Keys {
  21. const AK: [u8; 32] = [
  22. 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1,
  23. 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1,
  24. ];
  25. }
  26. fn process_incoming_frame(
  27. protocol: &mut RendezvousProtocol,
  28. pid: u32,
  29. incoming_frame: &OutgoingFrame,
  30. ) -> anyhow::Result<Option<PathProcessResult>> {
  31. let (header, payload) = incoming_frame.encode();
  32. if let Some(nominated_pid) = protocol.nominated_path()
  33. && pid != nominated_pid
  34. {
  35. warn!(
  36. pid,
  37. ?incoming_frame,
  38. "Discarding chunk for unknown or dropped path"
  39. );
  40. return Ok(None);
  41. }
  42. // Process incoming frame
  43. protocol
  44. .add_chunks(pid, &[header.as_slice(), payload])
  45. .context("Failed to add chunk")?;
  46. let result = protocol.process_frame(pid).context("Failed to process frame")?;
  47. Ok(result)
  48. }
  49. #[expect(clippy::needless_pass_by_value, reason = "Prevent re-use")]
  50. fn run_protocol(
  51. mut protocol: RendezvousProtocol,
  52. initial_outgoing_frames: Vec<(u32, OutgoingFrame)>,
  53. tx: mpsc::Sender<(u32, OutgoingFrame)>,
  54. rx: mpsc::Receiver<(u32, OutgoingFrame)>,
  55. ) -> anyhow::Result<()> {
  56. // Send initial frames
  57. for outgoing_frame in initial_outgoing_frames {
  58. tx.send(outgoing_frame)?;
  59. }
  60. // Nomination loop where we run the handshakes simultaneously over all available paths until we
  61. // have nominated one path.
  62. info!("Entering nomination loop");
  63. let (nominated_pid, rph) = 'nomination: loop {
  64. // Receive and process incoming frame
  65. let (pid, incoming_frame) = rx.recv().context("Failed to receive incoming frame")?;
  66. let mut maybe_result = process_incoming_frame(&mut protocol, pid, &incoming_frame)?;
  67. // Handle results
  68. while let Some(result) = maybe_result {
  69. // We're not expecting to receive any incoming ULP data.
  70. assert!(result.incoming_ulp_data.is_none(), "Unexpected incoming ULP data");
  71. // Send any outgoing frame
  72. if let Some(outgoing_frame) = result.outgoing_frame {
  73. tx.send((pid, outgoing_frame))
  74. .context("Failed to send outgoing frame")?;
  75. }
  76. // Handle any state update
  77. maybe_result = match result.state_update {
  78. Some(PathStateUpdate::AwaitingNominate { measured_rtt }) => {
  79. // Check if we should nominate the path
  80. //
  81. // Note: A real implementation should wait a bit and then choose the _best_ path
  82. // based on the measured RTT.
  83. trace!(?measured_rtt, "Path ready to nominate");
  84. if protocol.is_nominator() {
  85. Some(protocol.nominate_path(pid).context("Failed to nominate")?)
  86. } else {
  87. None
  88. }
  89. },
  90. Some(PathStateUpdate::Nominated { rph }) => {
  91. // The path was nominated
  92. break 'nomination (pid, rph);
  93. },
  94. None => None,
  95. }
  96. }
  97. };
  98. // ULP loop where we can use the nominated path to exchange arbitrary data. For this example, we
  99. // will send a string every 3s and print out whatever remote sent us.
  100. info!(rph = HEXLOWER.encode(&rph.0), "Path nominated, entering ULP loop");
  101. let (initial_timeout, outgoing_ulp_data) = if protocol.is_nominator() {
  102. (1000, "Tick")
  103. } else {
  104. (2000, "Tock")
  105. };
  106. let mut timeout = Duration::from_millis(initial_timeout);
  107. loop {
  108. let started_at = Instant::now();
  109. match rx.recv_timeout(timeout) {
  110. Ok((pid, incoming_frame)) => {
  111. // Calculate remaining time for the next iteration
  112. timeout = timeout.saturating_sub(Instant::elapsed(&started_at));
  113. // Receive and process incoming frame
  114. let maybe_result = process_incoming_frame(&mut protocol, pid, &incoming_frame)?;
  115. // Handle result
  116. if let Some(result) = maybe_result {
  117. // We're not expecting any state updates.
  118. assert!(result.state_update.is_none(), "Unexpected state update");
  119. // We're not expecting to send any outgoing frames since the handshake state
  120. // machine has completed.
  121. assert!(
  122. result.outgoing_frame.is_none(),
  123. "Unexpected outgoing frame in nominated state"
  124. );
  125. // We do expect incoming ULP data.
  126. let incoming_ulp_data =
  127. String::from_utf8(result.incoming_ulp_data.expect("Expecting incoming ULP data"))
  128. .context("Failed to decode ULP data string")?;
  129. info!(data = incoming_ulp_data, ?incoming_frame, "Received ULP data");
  130. }
  131. },
  132. Err(RecvTimeoutError::Timeout) => {
  133. // Create outgoing frame
  134. let result = protocol
  135. .create_ulp_frame(outgoing_ulp_data.as_bytes().to_vec())
  136. .context("Failed to create ULP frame")?;
  137. info!(
  138. data = outgoing_ulp_data,
  139. outgoing_frame = ?result.outgoing_frame,
  140. "Sending ULP data"
  141. );
  142. // We're not expecting any state updates.
  143. assert!(result.state_update.is_none(), "Unexpected state update");
  144. // Send any outgoing frame
  145. if let Some(outgoing_frame) = result.outgoing_frame {
  146. tx.send((nominated_pid, outgoing_frame))
  147. .context("Failed to send outgoing frame")?;
  148. }
  149. // We're not expecting to receive any incoming ULP data.
  150. assert!(result.incoming_ulp_data.is_none(), "Unexpected incoming ULP data");
  151. // Reset timeout
  152. timeout = Duration::from_millis(2000);
  153. },
  154. Err(RecvTimeoutError::Disconnected) => {
  155. return Err(RecvTimeoutError::Disconnected).context("Failed to receive incoming frame")?;
  156. },
  157. }
  158. }
  159. }
  160. fn main() {
  161. // Configure logging
  162. init_stderr_logging(Level::TRACE);
  163. // Communication channels for RID and RRD
  164. let (to_rrd, from_rid) = mpsc::channel::<(u32, OutgoingFrame)>();
  165. let (to_rid, from_rrd) = mpsc::channel::<(u32, OutgoingFrame)>();
  166. // Start RID
  167. let rid_thread = thread::spawn(move || {
  168. trace_span!("initiator").in_scope(|| {
  169. // Create and run protocol for RID
  170. let protocol = RendezvousProtocol::new_as_rid(true, AuthenticationKey(Keys::AK), &[0x1, 0x2]);
  171. let result = run_protocol(protocol, vec![], to_rrd, from_rrd);
  172. info!("Initiator stopped: {result:?}");
  173. });
  174. });
  175. // Start RRD
  176. let rrd_thread = thread::spawn(move || {
  177. trace_span!("responder").in_scope(|| {
  178. // Create and run protocol for RRD
  179. let (protocol, initial_outgoing_frames) =
  180. RendezvousProtocol::new_as_rrd(false, AuthenticationKey(Keys::AK), &[0x1, 0x2]);
  181. let result = run_protocol(protocol, initial_outgoing_frames, to_rid, from_rid);
  182. info!("Responder stopped: {result:?}");
  183. });
  184. });
  185. // Join threads
  186. let _ = [rid_thread, rrd_thread].map(|handle| handle.join().expect("Joining threads failed"));
  187. }