如何在Go中同步线程

Golang大师课程(8部分系列)
如何Go:基础设施的宠儿
Golang大师课程:接口驱动设计
Golang大师课程:结构体嵌入
Golang大师课程:函数式选项
Golang大师课程:依赖注入(DI)
Golang大师课程:单例结构体将拯救你的项目
如何在Golang中构建美丽的GUI:3种Web UI路径
如何在Go中同步线程
单线程代码已经带来头痛。添加第二个线程,就是从基础头痛升级了。
解决方案?互斥锁:线程和数据的交通警察。
一旦你理解了它们,线程同步就变成了第二本能,语言无关。
在C++和Go中工作,我遇到过所有常见的混乱:
有时会吞噬数据的竞态条件
线程践踏内存导致的段错误
还有沉默的杀手:死锁
最后一个是最糟糕的,没有崩溃,没有错误。只是一个死程序,卡在永恒的线程对峙中。
但当你理解了互斥锁背后的核心思想时,一切都开始变得清晰。
最好的部分?每种语言都说互斥锁:
Go sync.Mutex
C++ std::mutex
Python threading.Lock()
Java ReentrantLock
在这篇文章中,我将分解互斥锁的概念,展示死锁是如何发生的,并给你足够的直觉来处理任何语言中的线程代码。
学一次 到处应用。
互斥锁:互斥锁
线程引入了全新类别的问题,特别是在Go中,生成数千个线程实际上是免费的。
现在想象两个线程在完全相同的时间击中同一个数据源。那就是混乱。竞态条件、数据损坏、神秘bug,你不想调试的东西,更不用说向你的团队解释了。
进入互斥锁: 线程和共享数据之间的交通警察。
没有锁:
  1. 线程A ---> 数据源 <--- 线程B
有锁(两个线程之间共享):
  1. 线程A []---> 数据源 <---[] 线程B
互斥锁的工作很简单:一次只有一个线程进入。
如果线程A拥有锁,线程B被告知:"等待你的轮次。"
这里是一个简单的切片访问示例,有和没有锁:
没有锁:
  1. package main

  2. import (
  3. "fmt"
  4. "time"
  5. )

  6. func main() {
  7. var numbers []int

  8. // 启动5个goroutine,都向同一个切片追加
  9. for i := 0; i < 5; i++ {
  10. go func(n int) {
  11. // 这里没有锁定,这很可能会导致数据竞争
  12. numbers = append(numbers, n)
  13. fmt.Println("追加", n, "→", numbers)
  14. }(i)
  15. }

  16. // 给它们一点时间运行
  17. time.Sleep(1 * time.Second)
  18. }
go
有锁:
  1. package main

  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )

  7. func main() {
  8. var (
  9. numbers []int
  10. mu sync.Mutex
  11. )

  12. for i := 0; i < 5; i++ {
  13. go func(n int) {
  14. mu.Lock() // 获取锁
  15. defer mu.Unlock() // 确保它被释放,即使在panic时

  16. numbers = append(numbers, n)
  17. fmt.Println("追加", n, "→", numbers)
  18. }(i)
  19. }

  20. time.Sleep(1 * time.Second)
  21. }
go
注意我们如何做:
  1. mu.Lock()
  2. defer mu.Unlock()
go
defer保证无论我们如何退出那个goroutine,正常返回或panic,锁都会被释放。
一旦goroutine接触共享数据,就锁定它。 相信我,未来的你会感激的。
那么,死锁到底是什么?
死锁
回到我们的交通警察类比:
  1. 线程A []---> 数据源 <---[] 线程B
这有效是因为一个共享锁控制访问。但当我们引入同一车道中的两个共享锁时会发生什么?
  1. 线程A []--[]-> 数据源 <---[] 线程B
现在你有两个交通警察,都不知道谁负责。线程A卡在等待两者,永远在困惑中来回弹跳。这就是经典死锁。
通常的嫌疑人?相同的嵌套锁,调用一个从已经持有锁的另一个函数内部获取锁的函数。
这是一个真实世界的例子:
  1. func (m *ScheduledTask) Create(...) (task, error) {
  2. m.mu.Lock() // 锁1
  3. defer m.mu.Unlock() // 在结束时解锁1

  4. // ... 设置任务 ...

  5. if err := m.saveTasks(); err != nil { // 内部的锁2
  6. return task{}, err
  7. }

  8. return t, nil
  9. }
go
现在看看saveTasks内部:
  1. func (m *ScheduledTask) saveTasks() error {
  2. m.mu.Lock() // 锁2(再次)
  3. defer m.mu.Unlock()

  4. data, err := json.MarshalIndent(m.tasks, "", " ")
  5. if err != nil {
  6. return err
  7. }

  8. return os.WriteFile(tasks, data, 0644)
  9. }
go
死锁。
为什么?因为Create()已经持有锁,而saveTasks()试图再次获取它,第一个被释放之前。Go例程不会抱怨,它们只是默默地冻结。没有崩溃,没有堆栈跟踪,只是一个僵尸线程吞噬资源。
主线程呢?完全不知道。继续运行,而你的程序挂在边缘。
如果你认真对待构建真实世界的软件,你需要理解同步。
这些概念适用于所有语言。这是C++版本:
  1. std::lock_guard<std::mutex> lk(globalIPCData.mapMutex); // 在访问前锁定
  2. UIelement& u = uiSet.get(entityId);
cpp
学好这个。
一旦你将互斥锁视为具有绝对权威的交通警察,大多数线程问题就消失了。
我将在Substack上发布更多关于后端主题、JavaScript、Golang、C++和低级系统的深度探讨。希望你在那里;来打个招呼:
Coffee & Kernels
更多内容:
深度学习指南
深度学习搭便车指南:Python和JS示例。
深度学习:Pytorch和Tensorflow.js示例。
如何不那么糟糕地使用数据库
如何不那么糟糕地使用数据库和数据系统,带JavaScript示例。
数据库、数据系统和意图语言。
多语言互斥锁示例
C++ std::mutex
  1. #include <iostream>
  2. #include <mutex>
  3. #include <thread>
  4. #include <vector>

  5. int main() {
  6. int counter = 0;
  7. std::mutex m;
  8. std::vector<std::thread> threads;

  9. for (int i = 0; i < 5; ++i) {
  10. threads.emplace_back([&](){
  11. std::lock_guard<std::mutex> lock(m); // RAII:构造函数锁定,析构函数解锁
  12. ++counter; // 临界区
  13. std::cout << "C++计数器: " << counter << "\n";
  14. });
  15. }

  16. for (auto &t : threads) t.join();
  17. return 0;
  18. }
cpp
Python threading.Lock()
  1. import threading

  2. counter = 0
  3. lock = threading.Lock()

  4. def worker():
  5. global counter
  6. with lock: # 上下文管理器获取和释放
  7. counter += 1 # 临界区
  8. print(f"Python计数器: {counter}")

  9. threads = []
  10. for _ in range(5):
  11. t = threading.Thread(target=worker)
  12. t.start()
  13. threads.append(t)

  14. for t in threads:
  15. t.join()
python
Java ReentrantLock
  1. import java.util.concurrent.locks.ReentrantLock;

  2. public class Main {
  3. private static int counter = 0;
  4. private static final ReentrantLock lock = new ReentrantLock();

  5. public static void main(String[] args) throws InterruptedException {
  6. Thread[] threads = new Thread[5];

  7. for (int i = 0; i < 5; i++) {
  8. threads[i] = new Thread(() -> {
  9. lock.lock(); // 🔒 获取
  10. try {
  11. counter++; // 临界区
  12. System.out.println("Java计数器: " + counter);
  13. } finally {
  14. lock.unlock(); // 🔓 释放
  15. }
  16. });
  17. threads[i].start();
  18. }

  19. for (Thread t : threads) t.join();
  20. }
  21. }
java
🔑 关键要点
无论什么语言,配方都是:
获取锁/互斥锁,然后接触共享数据
你最小的临界工作
释放锁/互斥锁(或使用作用域/RAII/上下文自动释放)
实际应用示例
1. 银行账户示例
  1. package main

  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )

  7. type BankAccount struct {
  8. balance int
  9. mu sync.Mutex
  10. }

  11. func (ba *BankAccount) Deposit(amount int) {
  12. ba.mu.Lock()
  13. defer ba.mu.Unlock()
  14. ba.balance += amount
  15. fmt.Printf("存款 %d,余额: %d\n", amount, ba.balance)
  16. }

  17. func (ba *BankAccount) Withdraw(amount int) bool {
  18. ba.mu.Lock()
  19. defer ba.mu.Unlock()
  20. if ba.balance >= amount {
  21. ba.balance -= amount
  22. fmt.Printf("取款 %d,余额: %d\n", amount, ba.balance)
  23. return true
  24. }
  25. fmt.Printf("余额不足,无法取款 %d\n", amount)
  26. return false
  27. }

  28. func (ba *BankAccount) GetBalance() int {
  29. ba.mu.Lock()
  30. defer ba.mu.Unlock()
  31. return ba.balance
  32. }

  33. func main() {
  34. account := &BankAccount{balance: 1000}
  35. // 启动多个goroutine同时操作账户
  36. for i := 0; i < 5; i++ {
  37. go func(id int) {
  38. account.Deposit(100)
  39. time.Sleep(10 * time.Millisecond)
  40. account.Withdraw(50)
  41. }(i)
  42. }
  43. time.Sleep(1 * time.Second)
  44. fmt.Printf("最终余额: %d\n", account.GetBalance())
  45. }
go
2. 缓存示例
  1. package main

  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )

  7. type Cache struct {
  8. data map[string]interface{}
  9. mu sync.RWMutex // 读写锁,允许多个读取者
  10. }

  11. func NewCache() *Cache {
  12. return &Cache{
  13. data: make(map[string]interface{}),
  14. }
  15. }

  16. func (c *Cache) Set(key string, value interface{}) {
  17. c.mu.Lock()
  18. defer c.mu.Unlock()
  19. c.data[key] = value
  20. fmt.Printf("设置 %s = %v\n", key, value)
  21. }

  22. func (c *Cache) Get(key string) (interface{}, bool) {
  23. c.mu.RLock() // 读锁,允许多个goroutine同时读取
  24. defer c.mu.RUnlock()
  25. value, exists := c.data[key]
  26. if exists {
  27. fmt.Printf("获取 %s = %v\n", key, value)
  28. } else {
  29. fmt.Printf("键 %s 不存在\n", key)
  30. }
  31. return value, exists
  32. }

  33. func (c *Cache) Delete(key string) {
  34. c.mu.Lock()
  35. defer c.mu.Unlock()
  36. delete(c.data, key)
  37. fmt.Printf("删除键 %s\n", key)
  38. }

  39. func main() {
  40. cache := NewCache()
  41. // 启动多个读取者
  42. for i := 0; i < 3; i++ {
  43. go func(id int) {
  44. for j := 0; j < 5; j++ {
  45. cache.Get("key1")
  46. time.Sleep(50 * time.Millisecond)
  47. }
  48. }(i)
  49. }
  50. // 启动写入者
  51. go func() {
  52. for i := 0; i < 3; i++ {
  53. cache.Set("key1", fmt.Sprintf("value%d", i))
  54. time.Sleep(100 * time.Millisecond)
  55. }
  56. }()
  57. time.Sleep(1 * time.Second)
  58. }
go
3. 工作池示例
  1. package main

  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )

  7. type WorkerPool struct {
  8. jobs chan int
  9. results chan int
  10. wg sync.WaitGroup
  11. }

  12. func NewWorkerPool(numWorkers int) *WorkerPool {
  13. return &WorkerPool{
  14. jobs: make(chan int, 100),
  15. results: make(chan int, 100),
  16. }
  17. }

  18. func (wp *WorkerPool) Start(numWorkers int) {
  19. for i := 0; i < numWorkers; i++ {
  20. wp.wg.Add(1)
  21. go wp.worker(i)
  22. }
  23. }

  24. func (wp *WorkerPool) worker(id int) {
  25. defer wp.wg.Done()
  26. for job := range wp.jobs {
  27. fmt.Printf("工作者 %d 处理任务 %d\n", id, job)
  28. time.Sleep(100 * time.Millisecond) // 模拟工作
  29. wp.results <- job * 2 // 返回结果
  30. }
  31. }

  32. func (wp *WorkerPool) AddJob(job int) {
  33. wp.jobs <- job
  34. }

  35. func (wp *WorkerPool) Close() {
  36. close(wp.jobs)
  37. wp.wg.Wait()
  38. close(wp.results)
  39. }

  40. func main() {
  41. pool := NewWorkerPool(3)
  42. pool.Start(3)
  43. // 添加任务
  44. for i := 1; i <= 10; i++ {
  45. pool.AddJob(i)
  46. }
  47. // 关闭工作池并等待完成
  48. go func() {
  49. pool.Close()
  50. }()
  51. // 收集结果
  52. for result := range pool.results {
  53. fmt.Printf("收到结果: %d\n", result)
  54. }
  55. }
go
4. 条件变量示例
  1. package main

  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )

  7. type ProducerConsumer struct {
  8. buffer []int
  9. mu sync.Mutex
  10. cond *sync.Cond
  11. size int
  12. }

  13. func NewProducerConsumer(size int) *ProducerConsumer {
  14. pc := &ProducerConsumer{
  15. buffer: make([]int, 0, size),
  16. size: size,
  17. }
  18. pc.cond = sync.NewCond(&pc.mu)
  19. return pc
  20. }

  21. func (pc *ProducerConsumer) Produce(item int) {
  22. pc.mu.Lock()
  23. defer pc.mu.Unlock()
  24. // 等待缓冲区有空间
  25. for len(pc.buffer) >= pc.size {
  26. fmt.Printf("生产者等待,缓冲区已满\n")
  27. pc.cond.Wait()
  28. }
  29. pc.buffer = append(pc.buffer, item)
  30. fmt.Printf("生产: %d,缓冲区大小: %d\n", item, len(pc.buffer))
  31. // 通知消费者
  32. pc.cond.Signal()
  33. }

  34. func (pc *ProducerConsumer) Consume() int {
  35. pc.mu.Lock()
  36. defer pc.mu.Unlock()
  37. // 等待缓冲区有数据
  38. for len(pc.buffer) == 0 {
  39. fmt.Printf("消费者等待,缓冲区为空\n")
  40. pc.cond.Wait()
  41. }
  42. item := pc.buffer[0]
  43. pc.buffer = pc.buffer[1:]
  44. fmt.Printf("消费: %d,缓冲区大小: %d\n", item, len(pc.buffer))
  45. // 通知生产者
  46. pc.cond.Signal()
  47. return item
  48. }

  49. func main() {
  50. pc := NewProducerConsumer(3)
  51. // 启动生产者
  52. go func() {
  53. for i := 1; i <= 10; i++ {
  54. pc.Produce(i)
  55. time.Sleep(200 * time.Millisecond)
  56. }
  57. }()
  58. // 启动消费者
  59. go func() {
  60. for i := 0; i < 10; i++ {
  61. pc.Consume()
  62. time.Sleep(300 * time.Millisecond)
  63. }
  64. }()
  65. time.Sleep(5 * time.Second)
  66. }
go
最佳实践
1. 锁的粒度
  1. // 不好的做法:锁太大
  2. type BadExample struct {
  3. mu sync.Mutex
  4. data1 map[string]int
  5. data2 map[string]int
  6. }

  7. func (b *BadExample) UpdateData1(key string, value int) {
  8. b.mu.Lock()
  9. defer b.mu.Unlock()
  10. b.data1[key] = value // 只需要锁定data1
  11. // 但锁住了整个结构体
  12. }

  13. // 好的做法:细粒度锁
  14. type GoodExample struct {
  15. mu1 sync.Mutex
  16. mu2 sync.Mutex
  17. data1 map[string]int
  18. data2 map[string]int
  19. }

  20. func (g *GoodExample) UpdateData1(key string, value int) {
  21. g.mu1.Lock()
  22. defer g.mu1.Unlock()
  23. g.data1[key] = value // 只锁定需要的部分
  24. }
go
2. 避免死锁
  1. // 可能导致死锁的代码
  2. func (s *Service) Method1() {
  3. s.mu1.Lock()
  4. defer s.mu1.Unlock()
  5. s.Method2() // 可能尝试获取mu1,导致死锁
  6. }

  7. func (s *Service) Method2() {
  8. s.mu1.Lock() // 死锁!
  9. defer s.mu1.Unlock()
  10. }

  11. // 解决方案:使用sync.RWMutex或重新设计
  12. type Service struct {
  13. mu sync.RWMutex
  14. }

  15. func (s *Service) Method1() {
  16. s.mu.Lock()
  17. defer s.mu.Unlock()
  18. s.Method2() // 现在安全了
  19. }

  20. func (s *Service) Method2() {
  21. s.mu.RLock() // 读锁,不会死锁
  22. defer s.mu.RUnlock()
  23. }
go
3. 使用sync.Once
  1. package main

  2. import (
  3. "fmt"
  4. "sync"
  5. )

  6. type Singleton struct {
  7. data string
  8. }

  9. var (
  10. instance *Singleton
  11. once sync.Once
  12. )

  13. func GetInstance() *Singleton {
  14. once.Do(func() {
  15. instance = &Singleton{data: "单例数据"}
  16. fmt.Println("创建单例实例")
  17. })
  18. return instance
  19. }

  20. func main() {
  21. // 多个goroutine同时调用
  22. for i := 0; i < 5; i++ {
  23. go func(id int) {
  24. instance := GetInstance()
  25. fmt.Printf("Goroutine %d 获取实例: %v\n", id, instance)
  26. }(i)
  27. }
  28. // 等待所有goroutine完成
  29. time.Sleep(1 * time.Second)
  30. }
go
性能考虑
1. 锁竞争
  1. // 高竞争情况
  2. func HighContention() {
  3. var counter int
  4. var mu sync.Mutex
  5. for i := 0; i < 1000; i++ {
  6. go func() {
  7. mu.Lock()
  8. counter++
  9. mu.Unlock()
  10. }()
  11. }
  12. }

  13. // 减少竞争:使用原子操作
  14. import "sync/atomic"

  15. func LowContention() {
  16. var counter int64
  17. for i := 0; i < 1000; i++ {
  18. go func() {
  19. atomic.AddInt64(&counter, 1)
  20. }()
  21. }
  22. }
go
2. 读写锁优化
  1. type OptimizedCache struct {
  2. data map[string]interface{}
  3. mu sync.RWMutex
  4. }

  5. func (c *OptimizedCache) Get(key string) (interface{}, bool) {
  6. c.mu.RLock() // 多个读取者可以同时访问
  7. defer c.mu.RUnlock()
  8. return c.data[key], true
  9. }

  10. func (c *OptimizedCache) Set(key string, value interface{}) {
  11. c.mu.Lock() // 写入者独占访问
  12. defer c.mu.Unlock()
  13. c.data[key] = value
  14. }
go
调试技巧
1. 使用race detector
  1. go run -race your_program.go
bash
2. 添加调试信息
  1. type DebugMutex struct {
  2. sync.Mutex
  3. name string
  4. }

  5. func (dm *DebugMutex) Lock() {
  6. fmt.Printf("尝试锁定: %s\n", dm.name)
  7. dm.Mutex.Lock()
  8. fmt.Printf("已锁定: %s\n", dm.name)
  9. }

  10. func (dm *DebugMutex) Unlock() {
  11. fmt.Printf("解锁: %s\n", dm.name)
  12. dm.Mutex.Unlock()
  13. }
go