Python实现线程安全队列

最近学习spark,我主要使用pyspark api进行编程。

之前使用Python都是现学现用,用完就忘了也没有理解和记忆,因此这里把Python相关的知识也弥补和记录下来吧

多线程任务队列在实际项目中非常有用,关键的地方要实现队列的多线程同步问题,也即保证队列的多线程安全

例如:可以开多个消费者线程,每个线程上绑定一个队列,这样就实现了多个消费者同时处理不同队列上的任务

同时可以有多个生产者往队列发送消息,实现异步消息处理

先复习下互斥量条件变量的概念:

互斥量(mutex)从本质上说是一把锁,在访问共享资源前对互斥量进行加锁,在访问完成后释放互斥量上的锁。对互斥量进行加锁以后,任何其他试图再次对互斥锁加锁的线程将会阻塞直到当前线程释放该互斥锁。如果释放互斥锁时有多个线程阻塞,所有在该互斥锁上的阻塞线程都会变成可运行状态,第一个变为运行状态的线程可以对互斥锁加锁,其他线程将会看到互斥锁依然被锁住,只能回去再次等待它重新变为可用。

条件变量(cond)是在多线程程序中用来实现"等待--》唤醒"逻辑常用的方法。条件变量利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待"条件变量的条件成立"而挂起;另一个线程使“条件成立”。为了防止竞争,条件变量的使用总是和一个互斥锁结合在一起。线程在改变条件状态前必须首先锁住互斥量,函数pthread_cond_wait把自己放到等待条件的线程列表上,然后对互斥锁解锁(这两个操作是原子操作)。在函数返回时,互斥量再次被锁住

条件变量总是与互斥锁一起使用的

Python的threading中定义了两种锁:threading.Lock和threading.RLock

两者的不同在于后者是可重入锁,也就是说在一个线程内重复LOCK同一个锁不会发生死锁,这与POSIX中的PTHREAD_MUTEX_RECURSIVE也就是可递归锁的概念是相同的, 互斥锁的API有三个函数,分别执行分配锁,上锁,解锁操作。

Python实现线程安全队列

Python的threading中的条件变量默认绑定了一个RLock,也可以在初始化条件变量的时候传进去一个自己定义的锁.

Python实现线程安全队列

最后贴出我自己实现的简单线程安全任务队列

Python实现线程安全队列

测试代码

Python实现线程安全队列

相关新闻

历经多年发展,已成为国内好评如潮的Linux云计算运维、SRE、Devops、网络安全、云原生、Go、Python开发专业人才培训机构!