课程网址:https://pdos.csail.mit.edu/6.824/schedule.html
以下内容来自课程课件,由于课件纯文本显示效果不佳,故对其排版整理并添加了部分个人笔记批注。

Implementing distributed systems

Core infrastructures: threads and RPC (in Go)
Important for the labs

Why Go?

  1. good support for threads
  2. convenient RPC
  3. type- and memory- safe
  4. garbage-collected (no use after freeing problems)
    threads + GC is particularly attractive!
  5. not too complex
  6. many recent distributed systems implemented in Go

After the tutorial, use https://golang.org/doc/effective_go.html

Threads

  • a useful structuring tool, but can be tricky
  • Go calls them goroutines; everyone else calls them threads

Thread = “thread of execution”

  • threads allow one program to do many things at once
  • each thread executes serially, just like an ordinary non-threaded program
  • threads share memory
  • each thread includes some per-thread state:
    program counter, registers, stack, what it’s waiting for

serially(连续地,逐次地)

进程、线程和协程的区别和联系

Why threads?

  • I/O concurrency
    Client sends requests to many servers in parallel and waits for replies.
    Server processes multiple client requests; each request may block.
    While waiting for the disk to read data for client X, process a request from client Y.
  • Multicore performance
    Execute code in parallel on several cores.
  • Convenience
    In background, once per second, check whether each worker is still alive.

Is there an alternative to threads?

  • Yes: write code that explicitly interleaves(interleaf交织) activities, in a single thread.
    Usually called “event-driven”.
  • Keep a table of state about each activity, e.g. each client request.
  • One “event” loop that:
    checks for new input for each activity (e.g. arrival of reply from server),
    does the next step for each activity,
    updates state.
  • Event-driven gets you I/O concurrency,
    and eliminates thread costs (which can be substantial),
    but doesn’t get multi-core speedup,
    and is painful to program.

感觉这个”事件驱动“和select、epoll有点像

Threading challenges:

  1. sharing data safely
    what if two threads do n = n + 1 at the same time?
    or one thread reads while another increments?
    this is a “race” – and is often a bug
    -> use locks (Go’s sync.Mutex)
    -> or avoid sharing mutable data

    避免sharing的方式,比如说使用channels

  2. coordination between threads
    one thread is producing data, another thread is consuming it
    how can the consumer wait (and release the CPU)?
    how can the producer wake up the consumer?
    -> use Go channels or sync.Cond or sync.WaitGroup

  3. deadlock
    cycles via locks and/or communication (e.g. RPC or Go channels)


tutorial’s web crawler as a threading example

Crawler challenges

  • Exploit I/O concurrency
    Network latency is more limiting than network capacity
    Fetch many pages in parallel
    To increase URLs fetched per second
    => Use threads for concurrency
  • Fetch each URL only once
    avoid wasting network bandwidth
    be nice to remote servers
    => Need to remember which URLs visited
    Know when finished

ConcurrentMutex crawler

Why the Mutex (Lock() and Unlock()) in testAndSet?

  • the lock protects the data
    The code between lock/unlock is often called a “critical section”
  • Internally, map is a complex data structure (tree? expandable(可扩展的) hash?)
    Concurrent update/update may wreck(破坏) internal invariants
    Concurrent update/read may crash the read
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type fetchState struct {
mu sync.Mutex
fetched map[string]bool
}

func (fs *fetchState) testAndSet(url string) bool {
fs.mu.Lock() // the lock protects the data
defer fs.mu.Unlock()
r := fs.fetched[url]
fs.fetched[url] = true
return r
}

func ConcurrentMutex(url string, fetcher Fetcher, fs *fetchState) {
if fs.testAndSet(url) {
return
}
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
var done sync.WaitGroup
for _, u := range urls {
done.Add(1)
go func(u string) { // anonymous function
defer done.Done()
ConcurrentMutex(u, fetcher, fs)
}(u)
}
done.Wait() // sleep until counter becomes zero: sleep will not use CPU time
return
}

How does the ConcurrentMutex crawler decide it is done?
sync.WaitGroup
Wait() waits for all Add()s to be balanced by Done()s
i.e. waits for all child threads to finish

ConcurrentChannel crawler

  • channels both communicate and synchronize
  • several threads can send and receive on a channel
  • channels are cheap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func worker(url string, ch chan []string, fetcher Fetcher) {
urls, err := fetcher.Fetch(url)
if err != nil {
ch <- []string{}
} else {
ch <- urls
}
}

func coordinator(ch chan []string, fetcher Fetcher) {
n := 1
fetched := make(map[string]bool)
for urls := range ch {
for _, u := range urls {
if fetched[u] == false {
fetched[u] = true
n += 1
go worker(u, ch, fetcher)
}
}
n -= 1
if n == 0 {
break
}
}
}

func ConcurrentChannel(url string, fetcher Fetcher) {
ch := make(chan []string)
go func() {
ch <- []string{url}
}()
coordinator(ch, fetcher)
}
  • Note: there is no recursion here; instead there’s a work list.
  • Note: no need to lock the fetched map, because it isn’t shared!

Worker thread writes url slice, coordinator reads it, is that a race?

  • worker only writes slice before sending
  • coordinator only reads slice after receiving
    So they can’t use the slice at the same time.

Why does ConcurrentChannel() create a goroutine just for “ch <- …”?

Let’s get rid of the goroutine…(没看懂啥意思)

个人理解:对于无缓冲的channel,如果channel中已有数据,这时向channel写入数据的线程会被阻塞

When to use sharing and locks, versus(与…相比) channels?

  • Most problems can be solved in either style
  • What makes the most sense depends on how the programmer thinks
    • state – sharing and locks
    • communication – channels
  • For the 6.824 labs, I recommend sharing+locks for state,
    and sync.Cond or channels or time.Sleep() for waiting/notification.

实验中本人设计state是使用Lock的,但任务队列使用的是channel

vote-count example

vote-count-4.go

一个相对较好的使用 Mutexcond 的例子。

根据《Go 语言高性能编程》中的介绍:sync.Cond 条件变量用来协调想要访问共享资源的那些 goroutine,当共享资源的状态发生变化的时候,它可以用来通知被互斥锁阻塞的 goroutine。

  • NewCond 创建实例
    func NewCond(l Locker) *Cond

  • Broadcast 唤醒所有等待条件变量 c 的 goroutine,无需锁保护
    func (c *Cond) Broadcast()

  • ignal 只唤醒任意 1 个等待条件变量 c 的 goroutine,无需锁保护

    func (c *Cond) Signal()

  • 调用 Wait 会自动释放锁 c.L,并挂起调用者所在的 goroutine,因此当前协程会阻塞在 Wait 方法调用的地方。
    如果其他协程调用了 Signal 或 Broadcast 唤醒了该协程,那么 Wait 方法在结束阻塞时,会重新给 c.L 加锁,并且继续执行 Wait 后面的代码。
    func (c *Cond) Wait()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func main() {
rand.Seed(time.Now().UnixNano())

count := 0
finished := 0
var mu sync.Mutex
cond := sync.NewCond(&mu)

for i := 0; i < 10; i++ {
go func() {
vote := requestVote()
mu.Lock()
defer mu.Unlock()
if vote {
count++
}
finished++
cond.Broadcast()
}()
}

mu.Lock()
for count < 5 && finished != 10 {
cond.Wait() // 等待解锁并原子地进入睡眠; 当从cond.Wait()返回时将再次获得锁
}
if count >= 5 {
println("received 5+ votes!")
} else {
println("lost")
}
mu.Unlock()
}

一个使用channels不够好的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func main() {
rand.Seed(time.Now().UnixNano())

count := 0
finished := 0
ch := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
ch <- requestVote()
}()
}
for count < 5 && finished < 10 {
v := <-ch
if v {
count += 1
}
finished += 1
}
if count >= 5 {
println("received 5+ votes!")

} else {
println("lost")
}
}

问题在于:如果前5个人投了赞成票,剩下的线程就会被阻塞

改进

1
2
3
4
5
6
for i := 0; i < 10; i++ {
v := <-ch
if v {
count += 1
}
}

Remote Procedure Call (RPC)

  • Goal: RPC behaves as local procedure
  • hide details of network protocols
  • convert data (strings, arrays, maps, &c) to “wire format”
  • portability(可移植性, 便携性) / interoperability(互通性, 互操作性)
  • Software structure
    client app handler fns(fns是啥)
    stub fns dispatcher
    RPC lib RPC lib
    net ------------ net

Go example: kv.go on schedule page

A toy key/value storage server – Put(key,value), Get(key)->value
Uses Go’s RPC library

Common:

Declare Args and Reply struct for each server handler.

Client:

  • connect()'s Dial() creates a TCP connection to the server
  • get() and put() are client “stubs
  • Call() asks the RPC library to perform the call
    • you specify server function name, arguments, place to put reply
    • library marshalls args, sends request, waits, unmarshalls reply
    • return value from Call() indicates whether it got a reply
    • usually you’ll also have a reply.Err indicating service-level failure

Server:

  • Go requires server to declare an object with methods as RPC handlers
    Server then registers that object with the RPC library

  • Server accepts TCP connections, gives them to RPC library

  • The RPC library

    • reads each request
    • creates a new goroutine for this request
    • unmarshalls request
    • looks up the named object (in table create by Register())
    • calls the object’s named method (dispatch)
    • marshalls reply
    • writes reply on TCP connection
  • The server’s Get() and Put() handlers

    • Must lock, since RPC library creates a new goroutine for each request

      访问/修改相关状态时必须加锁?

    • read args; modify reply

    • Note: state-oriented implementation

A few details:

  • Binding: how does client know what server computer to talk to?

    • For Go’s RPC, server name/port is an argument to Dial
    • Big systems have some kind of name or configuration server
  • Marshalling: format data into packets

    • Go’s RPC library can pass strings, arrays, objects, maps, &c // &c是啥

    • Go passes pointers by copying the pointed-to data

    • Cannot pass channels or functions

    • Marshals only exported field (i.e., ones w CAPITAL letter)

      w 是 with 的简写

Simplest failure-handling scheme: “best-effort RPC”

  • Call() waits for response for a while
  • If none arrives, re-send the request
  • Do this a few times
  • Then give up and return an error

Better RPC behavior: “at-most-once RPC”

  • client re-sends if no answer;
  • server RPC code detects duplicate requests,
  • returns previous reply instead of re-running handler

Q: how to detect a duplicate request?

  • client includes unique ID (XID) with each request
    uses same XID for re-send
  • server:
    if seen[xid]:
    r = old[xid]
    else
    r = handler()
    old[xid] = r
    seen[xid] = true

some at-most-once complexities

  • this will come up in lab 3
  • what if two clients use the same XID?
    big random number?
  • how to avoid a huge seen[xid] table?
    idea:
    each client has a unique ID (perhaps a big random number)
    per-client RPC sequence numbers
    client includes “seen all replies <= X” with every RPC
    much like TCP sequence #s and acks
    then server can keep O(# clients) state, rather than O(# XIDs)
  • server must eventually discard info about old RPCs or old clients
    when is discard safe?
  • how to handle dup req while original is still executing?
    server doesn’t know reply yet
    idea: “pending” flag per executing RPC; wait or ignore

还不清楚,等lab3再看呗

# Go RPC is a simple form of “at-most-once”

  • open TCP connection
  • write request to TCP connection
  • Go RPC never re-sends a request
    So server won’t see duplicate requests
  • Go RPC code returns an error if it doesn’t get a reply
  • perhaps after a timeout (from TCP)
  • perhaps server didn’t see request
  • perhaps server processed requesbut server/net failed before reply came back