Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Multi-threading Patterns

Many zipper types implement [Send] and/or [Sync] meaning it is legal to send zippers across channels to distribute work to multiple threads, working in the same underlying trie. The zipper exclusivity rules uphold the guarantees that prevent data races.

NOTE: Rust's default allocator tends to become the bottleneck when creating and editing lots of trie paths in parallel. Experimentally we have found that jemalloc alleviates this issue, and the jemalloc crate feature can be enabled to switch the default allocator for the process.

Parallel Example

The code below will spawn parallel threads, and make a deep copy of every item in the "in subtrie into the "out" subtrie. This just illustrates one possible approach to distributing a workload across multiple threads and many other patterns are possible.

#![allow(unused)]
fn main() {
extern crate pathmap;
use pathmap::{PathMap, zipper::*};

let elements = 65535;
let thread_cnt: usize = 4;
let elements_per_thread = elements / thread_cnt;

//Pre-initialize the data in the `PathMap`, for demonstration purposes
let mut map = PathMap::<usize>::new();
let mut zipper = map.write_zipper_at_path(b"in");
for n in 0..thread_cnt {
    for i in (n * elements_per_thread)..((n+1) * elements_per_thread) {
        zipper.descend_to_byte(n as u8);
        zipper.descend_to(i.to_be_bytes());
        zipper.set_val(i);
        zipper.reset();
    }
}
drop(zipper);

let zipper_head = map.zipper_head();

std::thread::scope(|scope| {

    //Allocate channels to send the zippers
    let mut zipper_senders: Vec<std::sync::mpsc::Sender<(ReadZipperTracked<'_, '_, usize>, WriteZipperTracked<'_, '_, usize>)>> = Vec::with_capacity(thread_cnt);
    let mut signal_receivers: Vec<std::sync::mpsc::Receiver<bool>> = Vec::with_capacity(thread_cnt);

    //Spawn all the threads
    for _thread_idx in 0..thread_cnt {
        let (zipper_tx, zipper_rx) = std::sync::mpsc::channel();
        zipper_senders.push(zipper_tx);
        let (signal_tx, signal_rx) = std::sync::mpsc::channel::<bool>();
        signal_receivers.push(signal_rx);

        //=====================================================================================
        // Worker Thread Body
        //=====================================================================================
        scope.spawn(move || {
            loop {

                //The thread will block here waiting for the zippers to be sent
                match zipper_rx.recv() {
                    Ok((mut reader_z, mut writer_z)) => {

                        //We got the zippers, do the stuff
                        let witness = reader_z.witness();
                        while let Some(val) = reader_z.to_next_get_val_with_witness(&witness) {
                            writer_z.descend_to(reader_z.path());
                            writer_z.set_val(*val);
                            writer_z.reset();
                        }

                        //Tell the main thread we're done
                        signal_tx.send(true).unwrap();
                    },
                    Err(_) => {
                        //The zipper_sender channel is closed, meaning it's time to shut down
                        break;
                    }
                }
            }
        });
        //=====================================================================================
        // End of Worker Thread Body
        //=====================================================================================
    }

    let mut writer_z = zipper_head.write_zipper_at_exclusive_path(b"out").unwrap();
    writer_z.remove_branches(true);
    drop(writer_z);

    //Make a ReadZipper and a WriteZipper for each thread, and send them through the channel
    for n in 0..thread_cnt {
        let path = vec![b'o', b'u', b't', n as u8];
        let writer_z = zipper_head.write_zipper_at_exclusive_path(path).unwrap();
        let path = vec![b'i', b'n', n as u8];
        let reader_z = zipper_head.read_zipper_at_path(path).unwrap();

        zipper_senders[n].send((reader_z, writer_z)).unwrap();
    };

    //Wait for the threads to all be done
    for n in 0..thread_cnt {
        assert_eq!(signal_receivers[n].recv().unwrap(), true);
    };
});
drop(zipper_head);
}