Multi-threading Patterns
Many zipper types implement [Send] and/or [Sync] meaning it is legal to send zippers across channels to distribute work to multiple threads, working in the same underlying trie. The zipper exclusivity rules uphold the guarantees that prevent data races.
NOTE: Rust's default allocator tends to become the bottleneck when creating and editing lots of trie paths in parallel. Experimentally we have found that jemalloc alleviates this issue, and the jemalloc crate feature can be enabled to switch the default allocator for the process.
Parallel Example
The code below will spawn parallel threads, and make a deep copy of every item in the "in subtrie into the "out" subtrie. This just illustrates one possible approach to distributing a workload across multiple threads and many other patterns are possible.
#![allow(unused)] fn main() { extern crate pathmap; use pathmap::{PathMap, zipper::*}; let elements = 65535; let thread_cnt: usize = 4; let elements_per_thread = elements / thread_cnt; //Pre-initialize the data in the `PathMap`, for demonstration purposes let mut map = PathMap::<usize>::new(); let mut zipper = map.write_zipper_at_path(b"in"); for n in 0..thread_cnt { for i in (n * elements_per_thread)..((n+1) * elements_per_thread) { zipper.descend_to_byte(n as u8); zipper.descend_to(i.to_be_bytes()); zipper.set_val(i); zipper.reset(); } } drop(zipper); let zipper_head = map.zipper_head(); std::thread::scope(|scope| { //Allocate channels to send the zippers let mut zipper_senders: Vec<std::sync::mpsc::Sender<(ReadZipperTracked<'_, '_, usize>, WriteZipperTracked<'_, '_, usize>)>> = Vec::with_capacity(thread_cnt); let mut signal_receivers: Vec<std::sync::mpsc::Receiver<bool>> = Vec::with_capacity(thread_cnt); //Spawn all the threads for _thread_idx in 0..thread_cnt { let (zipper_tx, zipper_rx) = std::sync::mpsc::channel(); zipper_senders.push(zipper_tx); let (signal_tx, signal_rx) = std::sync::mpsc::channel::<bool>(); signal_receivers.push(signal_rx); //===================================================================================== // Worker Thread Body //===================================================================================== scope.spawn(move || { loop { //The thread will block here waiting for the zippers to be sent match zipper_rx.recv() { Ok((mut reader_z, mut writer_z)) => { //We got the zippers, do the stuff let witness = reader_z.witness(); while let Some(val) = reader_z.to_next_get_val_with_witness(&witness) { writer_z.descend_to(reader_z.path()); writer_z.set_val(*val); writer_z.reset(); } //Tell the main thread we're done signal_tx.send(true).unwrap(); }, Err(_) => { //The zipper_sender channel is closed, meaning it's time to shut down break; } } } }); //===================================================================================== // End of Worker Thread Body //===================================================================================== } let mut writer_z = zipper_head.write_zipper_at_exclusive_path(b"out").unwrap(); writer_z.remove_branches(true); drop(writer_z); //Make a ReadZipper and a WriteZipper for each thread, and send them through the channel for n in 0..thread_cnt { let path = vec![b'o', b'u', b't', n as u8]; let writer_z = zipper_head.write_zipper_at_exclusive_path(path).unwrap(); let path = vec![b'i', b'n', n as u8]; let reader_z = zipper_head.read_zipper_at_path(path).unwrap(); zipper_senders[n].send((reader_z, writer_z)).unwrap(); }; //Wait for the threads to all be done for n in 0..thread_cnt { assert_eq!(signal_receivers[n].recv().unwrap(), true); }; }); drop(zipper_head); }