目录[-]

python 关于多进程与多线程且写入同一文件情况

  任何语言学习多进程与多线程这都市必要的,最近在学习过程中涉及诸多多进程和多线程的情况,但是多进程之间文件是相互独立的,容易发生资源争抢的问题,造成资源的抢夺,同时写入文件使之卡住,由于不规律随机的原因,写入文件的数据也可能是乱码

多进程写入文件

关于网上所整理的一些资料来看,处理的方式各不相同

  • 1.使用进程锁,multiprocessing使用join()方法对进程进行阻塞
  • 2.使用文件锁,python 内置文件锁模块 fcntl

这是解决的两个方法,python的多进程不像linux环境下的多进程一样,linux下的multiprocessing库是基于fork函数,父进程fork了一个子进程之后 ,比如文件句柄都传递给子进程,但是在windows下如果打开一个文件,在子进程中写入会出现ValueError: I/O operation on closed file这样的错误,所以我们在windows下必须在if name == 'main'下使用多进程。

multiprocessing模块

之前我们刚刚说了multiprocessing模块在linux上支持比较好,但是python是跨平台语言。所以这个包提供了一个Process类来代表一个进程对象。这是multiprocessing的核心,,与threading很类似,但是对CPU的利用率会提高。

我们看一下Process类的构造方法:

init(self, group=None, target=None, name=None, args=(), kwargs={})
参数说明:

  • group:进程所属组。基本不用
  • target:表示调用对象。
  • args:表示调用对象的位置参数元组。
  • name:别名
  • kwargs:表示调用对象的字典。
我们接一个简单的例子说明:

import multiprocessing
def func(n):
    #获取进程名称
    name = multiprocessing.current_process().name
    print(name)
    return 
if name == 'main' :
    numList = []
    for i in xrange(5) :
        p = multiprocessing.Process(target=do, args=(i,))
        numList.append(p)
        p.start()
        p.join()
        print("Process end.")

这只是一个简单的用法,我们看看multiprocessing的POOL

POOL类

POOL模块下的各个方法

  • apply(func[, args=()[, kwds={}]])
    该函数用于传递不定参数,同python中的apply函数一致,主进程会被阻塞直到函数执行结束
  • apply_async(func[, args=()[, kwds={}[, callback=None]]])
    与apply用法一致,但它是非阻塞的且支持结果返回后进行回调。
  • map(func, iterable[, chunksize=None])
    Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到结果返回。 注意:虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。
  • map_async(func, iterable[, chunksize[, callback]])
    与map用法一致,但是它是非阻塞的。其有关事项见apply_async
  • close()
    关闭进程池(pool),使其不在接受新的任务。
  • terminal()
    结束工作进程,不在处理未处理的任务。
  • join()
    主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用。

不同方法下的解决方式

方法一使用进程锁:


from multiprocessing import Pool
def run(x):
    return "test"+str(x)
def write_(s):
    with open("test.csv","a+",encoding="utf8") as f:
        f.write(s)
        f.close()
if name == "main":
    pool = Pool(10)
    for x in range(10):
        pool.apply_async(func=run,args=(x,),callback=write_)
    pool.close()
    pool.join()

可能你觉得会有些慢,这是因为你要写入的数据比较少,启动进程也是需要时间的。

方法二使用文件锁:


import multiprocessing
import fcntl

def multi(filepath, fowrite): """ 多进程入口函数 """ pool = multiprocessing.Pool(10) pool.apply_async(multi_take, (fowrite,)) pool.close() pool.join() def multi_take(fowrite): """ 最终结果写入文件(加锁并发写) """ with open(fowrite,'a+') as fo: fcntl.flock(fo.fileno(), fcntl.LOCK_EX) #加锁
fo.write('hello world')