Saturday, April 23, 2022

Go vs Rust - Concurrency

 Go

Go has rich support for concurrency using goroutines and channels. 

Goroutines and Channels are a lightweight built-in feature for managing concurrency and communication between several functions executing at the same time. 

Goroutines

A goroutine is a function that is capable of running concurrently with other functions. To create a goroutine we use the keyword go followed by a function invocation:

package main
import "fmt"
func f(n int) {
  for i := 0; i < 10; i++ {
    fmt.Println(n, ":", i)
  }
}
func main() {
  go f(0)
  var input string
  fmt.Scanln(&input)
}

Channels

Channels provide a way for two goroutines to communicate with one another and synchronize their execution. Here is an example program using channels:

package main
import (
  "fmt"
  "time"
)
func pinger(c chan string) {
  for i := 0; ; i++ {
    c <- "ping"
  }
}
func printer(c chan string) {
  for {
    msg := <- c
    fmt.Println(msg)
    time.Sleep(time.Second * 1)
  }
}
func main() {
  var c chan string = make(chan string)
  go pinger(c)
  go printer(c)
  var input string
  fmt.Scanln(&input)
}

Channel Direction

We can specify a direction on a channel type thus restricting it to either sending or receiving. For example pinger's function signature can be changed to this:

func pinger(c chan<- string)

Now c can only be sent to. Attempting to receive from c will result in a compiler error. Similarly we can change printer to this:

func printer(c <-chan string)

A channel that doesn't have these restrictions is known as bi-directional. A bi-directional channel can be passed to a function that takes send-only or receive-only channels, but the reverse is not true.

Select

Go has a special statement called select which works like a switch but for channels:

The select statement is often used to implement a timeout. 

The default case happens immediately if none of the channels are ready.

select {
case msg1 := <- c1:
  fmt.Println("Message 1", msg1)
case msg2 := <- c2:
  fmt.Println("Message 2", msg2)
case <- time.After(time.Second):
  fmt.Println("timeout")
default:
  fmt.Println("nothing ready")
}

Buffered Channels

It's also possible to pass a second parameter to the make function when creating a channel:

c := make(chan int, 1)

This creates a buffered channel with a capacity of 1. Normally channels are synchronous; both sides of the channel will wait until the other side is ready. A buffered channel is asynchronous; sending or receiving a message will not wait unless the channel is already full.

Anonymous functions as goroutines

func main() {
fmt.Println("We are executing a goroutine")
arr := []int{2,3,4}
ch := make(chan int, len(arr))
go func(arr []int, ch chan int) {
for _, elem := range arr {
ch <- elem * 3
}
}(arr, ch)
for i := 0; i < len(arr); i++ {
fmt.Printf("Result: %v \n", <- ch)
}
}

Mutual exclusion

A problem that may arise when working with concurrency is when share the same resources, which shouldn’t be accessed at the same time by multiple goroutines.

In concurrency, the block of code that modifies shared resources is called the critical section.

package main
import (
"fmt"
"time"
)
var n = 1
var mu sync.Mutex
func timesThree() {
  mu.Lock()
defer mu.Unlock()
n *= 3
fmt.Println(n)
}
func main() {
fmt.Println("We are executing a goroutine")
for i := 0; i < 10; i++ {
go timesThree()
}
time.Sleep(time.Second)
}

Rust

Rust's memory safety features also apply to its concurrency story. Even concurrent Rust programs must be memory safe, having no data races. Rust's type system is up to the task, and gives you powerful ways to reason about concurrent code at compile time.

Send

The first trait we're going to talk about is Send. When a type T implements Send, it indicates that something of this type is able to have ownership transferred safely between threads.

This is important to enforce certain restrictions. For example, if we have a channel connecting two threads, we would want to be able to send some data down the channel and to the other thread. Therefore, we'd ensure that Send was implemented for that type.

Sync

The second of these traits is called Sync. When a type T implements Sync, it indicates that something of this type has no possibility of introducing memory unsafety when used from multiple threads concurrently through shared references. This implies that types which don't have interior mutability are inherently Sync, which includes simple primitive types (like u8) and aggregate types containing them.

For sharing references across threads, Rust provides a wrapper type called Arc<T>. Arc<T> implements Send and Sync if and only if T implements both Send and Sync. For example, an object of type Arc<RefCell<U>> cannot be transferred across threads because RefCell does not implement Sync, consequently Arc<RefCell<U>> would not implement Send.

These two traits allow you to use the type system to make strong guarantees about the properties of your code under concurrency.

Threads

The thread::spawn() method accepts a closure, which is executed in a new thread. It returns a handle to the thread, that can be used to wait for the child thread to finish and extract its result:

use std::thread;
fn main() {
    let handle = thread::spawn(|| {
        "Hello from a thread!"
    });
    println!("{}", handle.join().unwrap());
}

move closures

We can force our closure to take ownership of its environment with the move

use std::thread;
fn main() {
    let x = 1;
    thread::spawn(move || {
        println!("x is {}", x);
    });
}

Safe Shared Mutable State

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
fn main() {
    let data = Arc::new(Mutex::new(vec![1, 2, 3]));
    for i in 0..3 {
        let data = data.clone();
        thread::spawn(move || {
            let mut data = data.lock().unwrap();
            data[0] += i;
        });
    }
    thread::sleep(Duration::from_millis(50));
}

Channels

use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc;
fn main() {
    let data = Arc::new(Mutex::new(0));
    // `tx` is the "transmitter" or "sender".
    // `rx` is the "receiver".
    let (tx, rx) = mpsc::channel();
    for _ in 0..10 {
        let (data, tx) = (data.clone(), tx.clone());
        thread::spawn(move || {
            let mut data = data.lock().unwrap();
            *data += 1;
            tx.send(()).unwrap();
        });
    }
    for _ in 0..10 {
        rx.recv().unwrap();
    }





No comments:

Post a Comment