Barrier

欢迎指正

致力于自动化运维体系和运维技术的研究实践 | 青蛙兄blog

关注Python、GOlang语言在国内的推广、应用 | 青蛙兄blog

3.2引入Python的新功能

名称含义
Barrier(parties,action=None,timeout=None)构建Barrier对象,指定参与方数目,timeout是wait方法未指定超时的默认值
n_waiting当前在屏障中等待的线程数
parites各方数,就是需要多少个等待
wait(timeout=None)等待通过屏障,返回0到线程数-1的整数,每个线程返回不同,如果wait方法设置了超时,并超时发送,屏障将处于broken状态
import threading  
import logging  
  
#输出格式定义  
FORMAT = '%(asctime)-15s\t线程信息:%(thread)d %(threadName)s %(message)s'  
logging.basicConfig(level=logging.INFO,format=FORMAT)  
  
def worker(barrier:threading.Barrier):  
    logging.info('waiting for {} threads.'.format(barrier.n_waiting))  
    try:  
        barrier_id = barrier.wait()  
        logging.info('after barrier {}'.format(barrier_id))  
    except threading.BrokenBarrierError:  
        logging.info('Broken Barrier')  
  
barrier = threading.Barrier(3)  
  
for x in range(3):  #修改4,5,6,试试
  
    threading.Thread(target=worker,args=(barrier,)).start()

从运行结果看出:

所有线程冲到了Barrier前等待,直到到达parties的数目,屏障打开,所有线程停止等待,继续执行,

再有线程wait,屏障就绪等到到达参数方数目

举例,赛马比赛,所有马匹就位,开闸,下一批马匹陆续来到闸门前等待比赛

名称含义
broken如果屏障处于打破的状态,返回True
abort()将屏障置于broken状态,等待中的线程或者调用等待方法的线程中都会抛出BrokenBarrierError异常,直到reset方法来恢复屏障
reset()恢复屏障,重新开始拦截
import threading  
import logging  
  
#输出格式定义  
FORMAT = '%(asctime)-15s\t线程信息:%(thread)2d %(threadName)s %(message)s'  
logging.basicConfig(level=logging.INFO,format=FORMAT)  
  
def worker(barrier:threading.Barrier):  
    logging.info('waiting for {} threads.'.format(barrier.n_waiting))  
    try:  
        barrier_id = barrier.wait()  
        logging.info('after barrier {}'.format(barrier_id))  
    except threading.BrokenBarrierError:  
        logging.info('Broken Barrier')  
  
barrier = threading.Barrier(3)  
  
for x in range(0,9):  
    if x == 2:  
        barrier.abort()  
    elif x == 6:  
        barrier.reset()  
    threading.Event().wait(1)  
    threading.Thread(target=worker,args=(barrier,)).start() 

上例中,屏障中等待了2个,屏障就被break了,waiting的线程抛了BrokenBarrierError异常,新wait的线程也抛异常,直到屏障恢复,才继续按照parties数目要求继续拦截线程

wait方法超时实例

如果wait方法超时发生,屏障将处于broken状态,直到reset

import threading  
import logging  
  
#输出格式定义  
FORMAT = '%(asctime)-15s\t线程信息:%(thread)2d %(threadName)s %(message)s'  
logging.basicConfig(level=logging.INFO,format=FORMAT)  
  
def worker(barrier:threading.Barrier,i:int):  
    logging.info('waiting for {} threads.'.format(barrier.n_waiting))  
    try:  
        logging.info(barrier.broken) #是否broken  
        if i < 3 :  
            barrier_id = barrier.wait(1) #超时后,屏障broken  
        else:  
            if i == 6:  
                barrier.reset()#恢复屏障  
            barrier_id = barrier.wait()  
        logging.info('after barrier {}'.format(barrier_id))  
    except threading.BrokenBarrierError:  
        logging.info('Broken Barrier . run')  
  
barrier = threading.Barrier(3)  
  
for x in range(0,9):  
    threading.Event().wait(1)  
    threading.Thread(target=worker,args=(barrier,x)).start() 

Barrier应用

并发初始化

所有线程都必须初始化完成后,才能继续工作,例如运行前加载数据,检查,如果这些工作没完成,就开始运行,将不能正常工作

10个线程做10种准备,每个线程负责一种工作,只有这10种线程都完成后,才能继续工作,先完成的要等待后完成的线程

例如,启动一个长须,需要先加载磁盘文件,缓存预热,初始化连接池等工作,这些工作齐头并进,不过只有都满足了,程序才能继续向后执行,假设数据库连接失败,则初始化工作失败,就要abort,屏障broken,所有线程收到异常退出

工作量

有10个计算任务,完成6个,就算工作完成

未经允许不得转载:青蛙兄博客 » Barrier

赞 (4)

热门推荐