| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- //! Example for usage of the Connection Rendezvous Protocol state machine, using MPSC channels to
- //! simulate paths.
- #![expect(unused_crate_dependencies, reason = "Example triggered false positive")]
- use core::time::Duration;
- use std::{
- sync::mpsc::{self, RecvTimeoutError},
- thread,
- time::Instant,
- };
- use anyhow::Context as _;
- use data_encoding::HEXLOWER;
- use libthreema::{
- d2d_rendezvous::{
- AuthenticationKey, OutgoingFrame, PathProcessResult, PathStateUpdate, RendezvousProtocol,
- },
- utils::logging::init_stderr_logging,
- };
- use tracing::{Level, info, trace, trace_span, warn};
- struct Keys;
- impl Keys {
- const AK: [u8; 32] = [
- 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1,
- 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1, 0x1,
- ];
- }
- fn process_incoming_frame(
- protocol: &mut RendezvousProtocol,
- pid: u32,
- incoming_frame: &OutgoingFrame,
- ) -> anyhow::Result<Option<PathProcessResult>> {
- let (header, payload) = incoming_frame.encode();
- if let Some(nominated_pid) = protocol.nominated_path()
- && pid != nominated_pid
- {
- warn!(
- pid,
- ?incoming_frame,
- "Discarding chunk for unknown or dropped path"
- );
- return Ok(None);
- }
- // Process incoming frame
- protocol
- .add_chunks(pid, &[header.as_slice(), payload])
- .context("Failed to add chunk")?;
- let result = protocol.process_frame(pid).context("Failed to process frame")?;
- Ok(result)
- }
- #[expect(clippy::needless_pass_by_value, reason = "Prevent re-use")]
- fn run_protocol(
- mut protocol: RendezvousProtocol,
- initial_outgoing_frames: Vec<(u32, OutgoingFrame)>,
- tx: mpsc::Sender<(u32, OutgoingFrame)>,
- rx: mpsc::Receiver<(u32, OutgoingFrame)>,
- ) -> anyhow::Result<()> {
- // Send initial frames
- for outgoing_frame in initial_outgoing_frames {
- tx.send(outgoing_frame)?;
- }
- // Nomination loop where we run the handshakes simultaneously over all available paths until we
- // have nominated one path.
- info!("Entering nomination loop");
- let (nominated_pid, rph) = 'nomination: loop {
- // Receive and process incoming frame
- let (pid, incoming_frame) = rx.recv().context("Failed to receive incoming frame")?;
- let mut maybe_result = process_incoming_frame(&mut protocol, pid, &incoming_frame)?;
- // Handle results
- while let Some(result) = maybe_result {
- // We're not expecting to receive any incoming ULP data.
- assert!(result.incoming_ulp_data.is_none(), "Unexpected incoming ULP data");
- // Send any outgoing frame
- if let Some(outgoing_frame) = result.outgoing_frame {
- tx.send((pid, outgoing_frame))
- .context("Failed to send outgoing frame")?;
- }
- // Handle any state update
- maybe_result = match result.state_update {
- Some(PathStateUpdate::AwaitingNominate { measured_rtt }) => {
- // Check if we should nominate the path
- //
- // Note: A real implementation should wait a bit and then choose the _best_ path
- // based on the measured RTT.
- trace!(?measured_rtt, "Path ready to nominate");
- if protocol.is_nominator() {
- Some(protocol.nominate_path(pid).context("Failed to nominate")?)
- } else {
- None
- }
- },
- Some(PathStateUpdate::Nominated { rph }) => {
- // The path was nominated
- break 'nomination (pid, rph);
- },
- None => None,
- }
- }
- };
- // ULP loop where we can use the nominated path to exchange arbitrary data. For this example, we
- // will send a string every 3s and print out whatever remote sent us.
- info!(rph = HEXLOWER.encode(&rph.0), "Path nominated, entering ULP loop");
- let (initial_timeout, outgoing_ulp_data) = if protocol.is_nominator() {
- (1000, "Tick")
- } else {
- (2000, "Tock")
- };
- let mut timeout = Duration::from_millis(initial_timeout);
- loop {
- let started_at = Instant::now();
- match rx.recv_timeout(timeout) {
- Ok((pid, incoming_frame)) => {
- // Calculate remaining time for the next iteration
- timeout = timeout.saturating_sub(Instant::elapsed(&started_at));
- // Receive and process incoming frame
- let maybe_result = process_incoming_frame(&mut protocol, pid, &incoming_frame)?;
- // Handle result
- if let Some(result) = maybe_result {
- // We're not expecting any state updates.
- assert!(result.state_update.is_none(), "Unexpected state update");
- // We're not expecting to send any outgoing frames since the handshake state
- // machine has completed.
- assert!(
- result.outgoing_frame.is_none(),
- "Unexpected outgoing frame in nominated state"
- );
- // We do expect incoming ULP data.
- let incoming_ulp_data =
- String::from_utf8(result.incoming_ulp_data.expect("Expecting incoming ULP data"))
- .context("Failed to decode ULP data string")?;
- info!(data = incoming_ulp_data, ?incoming_frame, "Received ULP data");
- }
- },
- Err(RecvTimeoutError::Timeout) => {
- // Create outgoing frame
- let result = protocol
- .create_ulp_frame(outgoing_ulp_data.as_bytes().to_vec())
- .context("Failed to create ULP frame")?;
- info!(
- data = outgoing_ulp_data,
- outgoing_frame = ?result.outgoing_frame,
- "Sending ULP data"
- );
- // We're not expecting any state updates.
- assert!(result.state_update.is_none(), "Unexpected state update");
- // Send any outgoing frame
- if let Some(outgoing_frame) = result.outgoing_frame {
- tx.send((nominated_pid, outgoing_frame))
- .context("Failed to send outgoing frame")?;
- }
- // We're not expecting to receive any incoming ULP data.
- assert!(result.incoming_ulp_data.is_none(), "Unexpected incoming ULP data");
- // Reset timeout
- timeout = Duration::from_secs(2);
- },
- Err(RecvTimeoutError::Disconnected) => {
- return Err(RecvTimeoutError::Disconnected).context("Failed to receive incoming frame")?;
- },
- }
- }
- }
- fn main() {
- // Configure logging
- init_stderr_logging(Level::TRACE);
- // Communication channels for RID and RRD
- let (to_rrd, from_rid) = mpsc::channel::<(u32, OutgoingFrame)>();
- let (to_rid, from_rrd) = mpsc::channel::<(u32, OutgoingFrame)>();
- // Start RID
- let rid_thread = thread::spawn(move || {
- trace_span!("initiator").in_scope(|| {
- // Create and run protocol for RID
- let protocol = RendezvousProtocol::new_as_rid(true, AuthenticationKey(Keys::AK), &[0x1, 0x2]);
- let result = run_protocol(protocol, vec![], to_rrd, from_rrd);
- info!("Initiator stopped: {result:?}");
- });
- });
- // Start RRD
- let rrd_thread = thread::spawn(move || {
- trace_span!("responder").in_scope(|| {
- // Create and run protocol for RRD
- let (protocol, initial_outgoing_frames) =
- RendezvousProtocol::new_as_rrd(false, AuthenticationKey(Keys::AK), &[0x1, 0x2]);
- let result = run_protocol(protocol, initial_outgoing_frames, to_rid, from_rid);
- info!("Responder stopped: {result:?}");
- });
- });
- // Join threads
- let _ = [rid_thread, rrd_thread].map(|handle| handle.join().expect("Joining threads failed"));
- }
|