背景:有一个请求,后端有很多同步执行的 sql,速度比较慢,这里尝试使用 concurrent.futures 包下的 ThreadPoolExecutor,创建一个线程池,然后分配线程同时执行这些 sql,最后获取结果

# 说明

需要注意的是 Python 由于 GIL 限制,其实这里的并行是伪并行,因为 GIL 锁限制进程内的线程只能独享 cpu,并不能利用多核优势,但是由于我们是 io 密集型程序,GIL 在线程 IO 时会释放 cpu 资源同时做线程上下文切换,所以基本可以忽略 GIL 锁对这里使用多线程的影响。

另外其实有个好一些的方法,就是使用 python3.4 后新增功能 - 协程,方法使用 async 修饰,配合 await,能够实现单线程高吞吐量,但是改动比较大,并且底层 orm 需要再次封装,暂时先不考虑。

暂且先使用多线程。

def query_user():
    print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    sql = "select * from USER"
    return pd.read_sql_query(sql, engine)
def query_test(name, age):
    print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    sql = "select * from TEST"
    return pd.read_sql_query(sql, engine)
def query_info(name):
    print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    sql = "select * from INFO"
    return pd.read_sql_query(sql, engine)
def query_rule(name=None, type=None):
    print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    sql = "select * from INFO"
    return pd.read_sql_query(sql, engine)
def concurrent_test():
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [executor.submit(query_user),
                   executor.submit(query_test, *['Jalen', 12]),
                   executor.submit(query_rule, **{'type': 'commonRule'}),
                   executor.submit(query_info, 'Jalen')]
        user_value = futures[0].result()
        test_value = futures[1].result()
        rule_value = futures[2].result()
        info_value = futures[3].result()
        print(user_value)
        print(test_value)
        print(rule_value)
        print(info_value)
    print('End!')
if __name__ == '__main__':
    concurrent_test()
    print('End!')