如何在Go中同步线程
Golang大师课程(8部分系列)
如何Go:基础设施的宠儿
Golang大师课程:接口驱动设计
Golang大师课程:结构体嵌入
Golang大师课程:函数式选项
Golang大师课程:依赖注入(DI)
Golang大师课程:单例结构体将拯救你的项目
如何在Golang中构建美丽的GUI:3种Web UI路径
如何在Go中同步线程
单线程代码已经带来头痛。添加第二个线程,就是从基础头痛升级了。
解决方案?互斥锁:线程和数据的交通警察。
一旦你理解了它们,线程同步就变成了第二本能,语言无关。
在C++和Go中工作,我遇到过所有常见的混乱:
有时会吞噬数据的竞态条件
线程践踏内存导致的段错误
还有沉默的杀手:死锁
最后一个是最糟糕的,没有崩溃,没有错误。只是一个死程序,卡在永恒的线程对峙中。
但当你理解了互斥锁背后的核心思想时,一切都开始变得清晰。
最好的部分?每种语言都说互斥锁:
Go → sync.Mutex
C++ → std::mutex
Python → threading.Lock()
Java → ReentrantLock
在这篇文章中,我将分解互斥锁的概念,展示死锁是如何发生的,并给你足够的直觉来处理任何语言中的线程代码。
学一次 → 到处应用。
互斥锁:互斥锁
线程引入了全新类别的问题,特别是在Go中,生成数千个线程实际上是免费的。
现在想象两个线程在完全相同的时间击中同一个数据源。那就是混乱。竞态条件、数据损坏、神秘bug,你不想调试的东西,更不用说向你的团队解释了。
进入互斥锁: 线程和共享数据之间的交通警察。
没有锁:
- 线程A ---> 数据源 <--- 线程B
有锁(两个线程之间共享):
- 线程A [锁]---> 数据源 <---[锁] 线程B
互斥锁的工作很简单:一次只有一个线程进入。
如果线程A拥有锁,线程B被告知:"等待你的轮次。"
这里是一个简单的切片访问示例,有和没有锁:
没有锁:
- package main
- import (
- "fmt"
- "time"
- )
- func main() {
- var numbers []int
- // 启动5个goroutine,都向同一个切片追加
- for i := 0; i < 5; i++ {
- go func(n int) {
- // 这里没有锁定,这很可能会导致数据竞争
- numbers = append(numbers, n)
- fmt.Println("追加", n, "→", numbers)
- }(i)
- }
- // 给它们一点时间运行
- time.Sleep(1 * time.Second)
- }
有锁:
- package main
- import (
- "fmt"
- "sync"
- "time"
- )
- func main() {
- var (
- numbers []int
- mu sync.Mutex
- )
- for i := 0; i < 5; i++ {
- go func(n int) {
- mu.Lock() // 获取锁
- defer mu.Unlock() // 确保它被释放,即使在panic时
- numbers = append(numbers, n)
- fmt.Println("追加", n, "→", numbers)
- }(i)
- }
- time.Sleep(1 * time.Second)
- }
注意我们如何做:
- mu.Lock()
- defer mu.Unlock()
defer保证无论我们如何退出那个goroutine,正常返回或panic,锁都会被释放。
一旦goroutine接触共享数据,就锁定它。 相信我,未来的你会感激的。
那么,死锁到底是什么?
死锁
回到我们的交通警察类比:
- 线程A [锁]---> 数据源 <---[锁] 线程B
这有效是因为一个共享锁控制访问。但当我们引入同一车道中的两个共享锁时会发生什么?
- 线程A [锁]--[锁]-> 数据源 <---[锁] 线程B
现在你有两个交通警察,都不知道谁负责。线程A卡在等待两者,永远在困惑中来回弹跳。这就是经典死锁。
通常的嫌疑人?相同的嵌套锁,调用一个从已经持有锁的另一个函数内部获取锁的函数。
这是一个真实世界的例子:
- func (m *ScheduledTask) Create(...) (task, error) {
- m.mu.Lock() // 锁1
- defer m.mu.Unlock() // 在结束时解锁1
- // ... 设置任务 ...
- if err := m.saveTasks(); err != nil { // 内部的锁2
- return task{}, err
- }
- return t, nil
- }
现在看看saveTasks内部:
- func (m *ScheduledTask) saveTasks() error {
- m.mu.Lock() // 锁2(再次)
- defer m.mu.Unlock()
- data, err := json.MarshalIndent(m.tasks, "", " ")
- if err != nil {
- return err
- }
- return os.WriteFile(tasks, data, 0644)
- }
死锁。
为什么?因为Create()已经持有锁,而saveTasks()试图再次获取它,在第一个被释放之前。Go例程不会抱怨,它们只是默默地冻结。没有崩溃,没有堆栈跟踪,只是一个僵尸线程吞噬资源。
主线程呢?完全不知道。继续运行,而你的程序挂在边缘。
如果你认真对待构建真实世界的软件,你需要理解同步。
这些概念适用于所有语言。这是C++版本:
- std::lock_guard<std::mutex> lk(globalIPCData.mapMutex); // 在访问前锁定
- UIelement& u = uiSet.get(entityId);
学好这个。
一旦你将互斥锁视为具有绝对权威的交通警察,大多数线程问题就消失了。
我将在Substack上发布更多关于后端主题、JavaScript、Golang、C++和低级系统的深度探讨。希望你在那里;来打个招呼:
Coffee & Kernels
更多内容:
深度学习指南
深度学习搭便车指南:Python和JS示例。
深度学习:Pytorch和Tensorflow.js示例。
如何不那么糟糕地使用数据库
如何不那么糟糕地使用数据库和数据系统,带JavaScript示例。
数据库、数据系统和意图语言。
多语言互斥锁示例
C++ → std::mutex
- #include <iostream>
- #include <mutex>
- #include <thread>
- #include <vector>
- int main() {
- int counter = 0;
- std::mutex m;
- std::vector<std::thread> threads;
- for (int i = 0; i < 5; ++i) {
- threads.emplace_back([&](){
- std::lock_guard<std::mutex> lock(m); // RAII:构造函数锁定,析构函数解锁
- ++counter; // 临界区
- std::cout << "C++计数器: " << counter << "\n";
- });
- }
- for (auto &t : threads) t.join();
- return 0;
- }
Python → threading.Lock()
- import threading
- counter = 0
- lock = threading.Lock()
- def worker():
- global counter
- with lock: # 上下文管理器获取和释放
- counter += 1 # 临界区
- print(f"Python计数器: {counter}")
- threads = []
- for _ in range(5):
- t = threading.Thread(target=worker)
- t.start()
- threads.append(t)
- for t in threads:
- t.join()
Java → ReentrantLock
- import java.util.concurrent.locks.ReentrantLock;
- public class Main {
- private static int counter = 0;
- private static final ReentrantLock lock = new ReentrantLock();
- public static void main(String[] args) throws InterruptedException {
- Thread[] threads = new Thread[5];
- for (int i = 0; i < 5; i++) {
- threads[i] = new Thread(() -> {
- lock.lock(); // 🔒 获取
- try {
- counter++; // 临界区
- System.out.println("Java计数器: " + counter);
- } finally {
- lock.unlock(); // 🔓 释放
- }
- });
- threads[i].start();
- }
- for (Thread t : threads) t.join();
- }
- }
🔑 关键要点
无论什么语言,配方都是:
获取锁/互斥锁,然后接触共享数据
做你最小的临界工作
释放锁/互斥锁(或使用作用域/RAII/上下文自动释放)
实际应用示例
1. 银行账户示例
- package main
- import (
- "fmt"
- "sync"
- "time"
- )
- type BankAccount struct {
- balance int
- mu sync.Mutex
- }
- func (ba *BankAccount) Deposit(amount int) {
- ba.mu.Lock()
- defer ba.mu.Unlock()
- ba.balance += amount
- fmt.Printf("存款 %d,余额: %d\n", amount, ba.balance)
- }
- func (ba *BankAccount) Withdraw(amount int) bool {
- ba.mu.Lock()
- defer ba.mu.Unlock()
- if ba.balance >= amount {
- ba.balance -= amount
- fmt.Printf("取款 %d,余额: %d\n", amount, ba.balance)
- return true
- }
- fmt.Printf("余额不足,无法取款 %d\n", amount)
- return false
- }
- func (ba *BankAccount) GetBalance() int {
- ba.mu.Lock()
- defer ba.mu.Unlock()
- return ba.balance
- }
- func main() {
- account := &BankAccount{balance: 1000}
- // 启动多个goroutine同时操作账户
- for i := 0; i < 5; i++ {
- go func(id int) {
- account.Deposit(100)
- time.Sleep(10 * time.Millisecond)
- account.Withdraw(50)
- }(i)
- }
- time.Sleep(1 * time.Second)
- fmt.Printf("最终余额: %d\n", account.GetBalance())
- }
2. 缓存示例
- package main
- import (
- "fmt"
- "sync"
- "time"
- )
- type Cache struct {
- data map[string]interface{}
- mu sync.RWMutex // 读写锁,允许多个读取者
- }
- func NewCache() *Cache {
- return &Cache{
- data: make(map[string]interface{}),
- }
- }
- func (c *Cache) Set(key string, value interface{}) {
- c.mu.Lock()
- defer c.mu.Unlock()
- c.data[key] = value
- fmt.Printf("设置 %s = %v\n", key, value)
- }
- func (c *Cache) Get(key string) (interface{}, bool) {
- c.mu.RLock() // 读锁,允许多个goroutine同时读取
- defer c.mu.RUnlock()
- value, exists := c.data[key]
- if exists {
- fmt.Printf("获取 %s = %v\n", key, value)
- } else {
- fmt.Printf("键 %s 不存在\n", key)
- }
- return value, exists
- }
- func (c *Cache) Delete(key string) {
- c.mu.Lock()
- defer c.mu.Unlock()
- delete(c.data, key)
- fmt.Printf("删除键 %s\n", key)
- }
- func main() {
- cache := NewCache()
- // 启动多个读取者
- for i := 0; i < 3; i++ {
- go func(id int) {
- for j := 0; j < 5; j++ {
- cache.Get("key1")
- time.Sleep(50 * time.Millisecond)
- }
- }(i)
- }
- // 启动写入者
- go func() {
- for i := 0; i < 3; i++ {
- cache.Set("key1", fmt.Sprintf("value%d", i))
- time.Sleep(100 * time.Millisecond)
- }
- }()
- time.Sleep(1 * time.Second)
- }
3. 工作池示例
- package main
- import (
- "fmt"
- "sync"
- "time"
- )
- type WorkerPool struct {
- jobs chan int
- results chan int
- wg sync.WaitGroup
- }
- func NewWorkerPool(numWorkers int) *WorkerPool {
- return &WorkerPool{
- jobs: make(chan int, 100),
- results: make(chan int, 100),
- }
- }
- func (wp *WorkerPool) Start(numWorkers int) {
- for i := 0; i < numWorkers; i++ {
- wp.wg.Add(1)
- go wp.worker(i)
- }
- }
- func (wp *WorkerPool) worker(id int) {
- defer wp.wg.Done()
- for job := range wp.jobs {
- fmt.Printf("工作者 %d 处理任务 %d\n", id, job)
- time.Sleep(100 * time.Millisecond) // 模拟工作
- wp.results <- job * 2 // 返回结果
- }
- }
- func (wp *WorkerPool) AddJob(job int) {
- wp.jobs <- job
- }
- func (wp *WorkerPool) Close() {
- close(wp.jobs)
- wp.wg.Wait()
- close(wp.results)
- }
- func main() {
- pool := NewWorkerPool(3)
- pool.Start(3)
- // 添加任务
- for i := 1; i <= 10; i++ {
- pool.AddJob(i)
- }
- // 关闭工作池并等待完成
- go func() {
- pool.Close()
- }()
- // 收集结果
- for result := range pool.results {
- fmt.Printf("收到结果: %d\n", result)
- }
- }
4. 条件变量示例
- package main
- import (
- "fmt"
- "sync"
- "time"
- )
- type ProducerConsumer struct {
- buffer []int
- mu sync.Mutex
- cond *sync.Cond
- size int
- }
- func NewProducerConsumer(size int) *ProducerConsumer {
- pc := &ProducerConsumer{
- buffer: make([]int, 0, size),
- size: size,
- }
- pc.cond = sync.NewCond(&pc.mu)
- return pc
- }
- func (pc *ProducerConsumer) Produce(item int) {
- pc.mu.Lock()
- defer pc.mu.Unlock()
- // 等待缓冲区有空间
- for len(pc.buffer) >= pc.size {
- fmt.Printf("生产者等待,缓冲区已满\n")
- pc.cond.Wait()
- }
- pc.buffer = append(pc.buffer, item)
- fmt.Printf("生产: %d,缓冲区大小: %d\n", item, len(pc.buffer))
- // 通知消费者
- pc.cond.Signal()
- }
- func (pc *ProducerConsumer) Consume() int {
- pc.mu.Lock()
- defer pc.mu.Unlock()
- // 等待缓冲区有数据
- for len(pc.buffer) == 0 {
- fmt.Printf("消费者等待,缓冲区为空\n")
- pc.cond.Wait()
- }
- item := pc.buffer[0]
- pc.buffer = pc.buffer[1:]
- fmt.Printf("消费: %d,缓冲区大小: %d\n", item, len(pc.buffer))
- // 通知生产者
- pc.cond.Signal()
- return item
- }
- func main() {
- pc := NewProducerConsumer(3)
- // 启动生产者
- go func() {
- for i := 1; i <= 10; i++ {
- pc.Produce(i)
- time.Sleep(200 * time.Millisecond)
- }
- }()
- // 启动消费者
- go func() {
- for i := 0; i < 10; i++ {
- pc.Consume()
- time.Sleep(300 * time.Millisecond)
- }
- }()
- time.Sleep(5 * time.Second)
- }
最佳实践
1. 锁的粒度
- // 不好的做法:锁太大
- type BadExample struct {
- mu sync.Mutex
- data1 map[string]int
- data2 map[string]int
- }
- func (b *BadExample) UpdateData1(key string, value int) {
- b.mu.Lock()
- defer b.mu.Unlock()
- b.data1[key] = value // 只需要锁定data1
- // 但锁住了整个结构体
- }
- // 好的做法:细粒度锁
- type GoodExample struct {
- mu1 sync.Mutex
- mu2 sync.Mutex
- data1 map[string]int
- data2 map[string]int
- }
- func (g *GoodExample) UpdateData1(key string, value int) {
- g.mu1.Lock()
- defer g.mu1.Unlock()
- g.data1[key] = value // 只锁定需要的部分
- }
2. 避免死锁
- // 可能导致死锁的代码
- func (s *Service) Method1() {
- s.mu1.Lock()
- defer s.mu1.Unlock()
- s.Method2() // 可能尝试获取mu1,导致死锁
- }
- func (s *Service) Method2() {
- s.mu1.Lock() // 死锁!
- defer s.mu1.Unlock()
- }
- // 解决方案:使用sync.RWMutex或重新设计
- type Service struct {
- mu sync.RWMutex
- }
- func (s *Service) Method1() {
- s.mu.Lock()
- defer s.mu.Unlock()
- s.Method2() // 现在安全了
- }
- func (s *Service) Method2() {
- s.mu.RLock() // 读锁,不会死锁
- defer s.mu.RUnlock()
- }
3. 使用sync.Once
- package main
- import (
- "fmt"
- "sync"
- )
- type Singleton struct {
- data string
- }
- var (
- instance *Singleton
- once sync.Once
- )
- func GetInstance() *Singleton {
- once.Do(func() {
- instance = &Singleton{data: "单例数据"}
- fmt.Println("创建单例实例")
- })
- return instance
- }
- func main() {
- // 多个goroutine同时调用
- for i := 0; i < 5; i++ {
- go func(id int) {
- instance := GetInstance()
- fmt.Printf("Goroutine %d 获取实例: %v\n", id, instance)
- }(i)
- }
- // 等待所有goroutine完成
- time.Sleep(1 * time.Second)
- }
性能考虑
1. 锁竞争
- // 高竞争情况
- func HighContention() {
- var counter int
- var mu sync.Mutex
- for i := 0; i < 1000; i++ {
- go func() {
- mu.Lock()
- counter++
- mu.Unlock()
- }()
- }
- }
- // 减少竞争:使用原子操作
- import "sync/atomic"
- func LowContention() {
- var counter int64
- for i := 0; i < 1000; i++ {
- go func() {
- atomic.AddInt64(&counter, 1)
- }()
- }
- }
2. 读写锁优化
- type OptimizedCache struct {
- data map[string]interface{}
- mu sync.RWMutex
- }
- func (c *OptimizedCache) Get(key string) (interface{}, bool) {
- c.mu.RLock() // 多个读取者可以同时访问
- defer c.mu.RUnlock()
- return c.data[key], true
- }
- func (c *OptimizedCache) Set(key string, value interface{}) {
- c.mu.Lock() // 写入者独占访问
- defer c.mu.Unlock()
- c.data[key] = value
- }
调试技巧
1. 使用race detector
- go run -race your_program.go
2. 添加调试信息
- type DebugMutex struct {
- sync.Mutex
- name string
- }
- func (dm *DebugMutex) Lock() {
- fmt.Printf("尝试锁定: %s\n", dm.name)
- dm.Mutex.Lock()
- fmt.Printf("已锁定: %s\n", dm.name)
- }
- func (dm *DebugMutex) Unlock() {
- fmt.Printf("解锁: %s\n", dm.name)
- dm.Mutex.Unlock()
- }
