Python多进程分块读取超大文件的方法,python实例详解
分类:web前端

Python多进程机制实例详解,python实例详解

本文实例讲述了Python多进程机制。分享给大家供大家参考。具体如下:

在以前只是接触过PYTHON的多线程机制,今天搜了一下多进程,相关文章好像不是特别多。看了几篇,小试了一把。程序如下,主要内容就是通过PRODUCER读一个本地文件,一行一行的放到队列中去。然后会有相应的WORKER从队列中取出这些行。

import multiprocessing
import os
import sys
import Queue
import time
def writeQ(q,obj):
    q.put(obj,True,None)
    print "put size: ",q.qsize()
def readQ(q):
    ret = q.get(True,1)
    print "get size: ",q.qsize()
    return ret
def producer(q):
    time.sleep(5)  #让进行休息几秒 方便ps命令看到相关内容
    pid = os.getpid()
    handle_file = '/home/dwapp/joe.wangh/test/multiprocess/datafile'
    with open(handle_file,'r') as f:   #with...as... 这个用法今天也是第一次看到的
        for line in f:
            print "producer <" ,pid , "> is doing: ",line
            writeQ(q,line.strip())
    q.close()
def worker(q):
    time.sleep(5)  #让进行休息几秒 方便ps命令看到相关内容
    pid = os.getpid()
    empty_count = 0
    while True:
        try:
            task = readQ(q)
            print "worker <" , pid , "> is doing: " ,task
            '''
            如果这里不休眠的话 一般情况下所有行都会被同一个子进程读取到 为了使实验效果更加清楚 在这里让每个进程读取完
一行内容时候休眠5s 这样就可以让其他的进程到队列中进行读取
            '''
            time.sleep(5)  
        except Queue.Empty:
            empty_count += 1
            if empty_count == 3:
                print "queue is empty, quit"
                q.close()
                sys.exit(0)
def main():
    concurrence = 3
    q = multiprocessing.Queue(10)
    funcs = [producer , worker]
    for i in range(concurrence-1):
        funcs.append(worker)
    for item in funcs:
        print str(item)
    nfuncs = range( len(funcs) )
    processes = []    
    for i in nfuncs:
        p = multiprocessing.Process(target=funcs[i] , args=(q,))
        processes.append(p)
    print "concurrence worker is : ",concurrence," working start"
    for i in nfuncs:
        processes[i].start()
    for i in nfuncs:
        processes[i].join()
    print "all DONE"
if __name__ == '__main__':
    main()

实验结果如下:

[email protected]:/home/dwapp/joe.wangh/test/multiprocess>python 1.py 
<function producer at 0xb7b9141c>
<function worker at 0xb7b91454>
<function worker at 0xb7b91454>
<function worker at 0xb7b91454>
concurrence worker is : 3 working start
producer < 28320 > is doing: line 1
put size: 1
producer < 28320 > is doing: line 2
put size: 2
producer < 28320 > is doing: line 3
put size: 3
producer < 28320 > is doing: line 4
put size: 3
producer < 28320 > is doing: line 5
get size: 3
put size: 4
worker < 28321 > is doing: line 1
get size: 3
worker < 28322 > is doing: line 2
get size: 2
worker < 28323 > is doing: line 3
get size: 1
worker < 28321 > is doing: line 4
get size: 0
worker < 28322 > is doing: line 5
queue is empty, quit
queue is empty, quit
queue is empty, quit
all DONE

程序运行期间在另外一个窗口进行ps命令 可以观测到一些进程的信息

[email protected]:/home/dwapp/joe.wangh/test/multiprocess>ps -ef | grep python
dwapp  13735 11830 0 Nov20 pts/12  00:00:05 python
dwapp  28319 27481 8 14:04 pts/0  00:00:00 python 1.py
dwapp  28320 28319 0 14:04 pts/0  00:00:00 python 1.py
dwapp  28321 28319 0 14:04 pts/0  00:00:00 python 1.py
dwapp  28322 28319 0 14:04 pts/0  00:00:00 python 1.py
dwapp  28323 28319 0 14:04 pts/0  00:00:00 python 1.py
dwapp  28325 27849 0 14:04 pts/13  00:00:00 grep python
[email protected]:/home/dwapp/joe.wangh/test/multiprocess>ps -ef | grep python
dwapp  13735 11830 0 Nov20 pts/12  00:00:05 python     #此时28320进程 也就是PRODUCER进程已经结束
dwapp  28319 27481 1 14:04 pts/0  00:00:00 python 1.py
dwapp  28321 28319 0 14:04 pts/0  00:00:00 python 1.py
dwapp  28322 28319 0 14:04 pts/0  00:00:00 python 1.py
dwapp  28323 28319 0 14:04 pts/0  00:00:00 python 1.py
dwapp  28328 27849 0 14:04 pts/13  00:00:00 grep python
[email protected]:/home/dwapp/joe.wangh/test/multiprocess>ps -ef | grep python
dwapp  13735 11830 0 Nov20 pts/12  00:00:05 python
dwapp  28319 27481 0 14:04 pts/0  00:00:00 python 1.py
dwapp  28321 28319 0 14:04 pts/0  00:00:00 python 1.py
dwapp  28322 28319 0 14:04 pts/0  00:00:00 python 1.py
dwapp  28323 28319 0 14:04 pts/0  00:00:00 [python] <defunct>  #这里应该是代表28323进程(WORKER)已经运行结束了
dwapp  28331 27849 0 14:04 pts/13  00:00:00 grep python
[email protected]:/home/dwapp/joe.wangh/test/multiprocess>ps -ef | grep python
dwapp  13735 11830 0 Nov20 pts/12  00:00:05 python
dwapp  28337 27849 0 14:05 pts/13  00:00:00 grep python

希望本文所述对大家的Python程序设计有所帮助。

本文实例讲述了Python多进程机制。分享给大家供大家参考。具体如下: 在以前只是接触过PYTHON的...

Python多进程分块读取超大文件的方法,

本文实例讲述了Python多进程分块读取超大文件的方法。分享给大家供大家参考,具体如下:

读取超大的文本文件,使用多进程分块读取,将每一块单独输出成文件

# -*- coding: GBK -*-
import urlparse
import datetime
import os
from multiprocessing import Process,Queue,Array,RLock
"""
多进程分块读取文件
"""
WORKERS = 4
BLOCKSIZE = 100000000
FILE_SIZE = 0
def getFilesize(file):
  """
    获取要读取文件的大小
  """
  global FILE_SIZE
  fstream = open(file,'r')
  fstream.seek(0,os.SEEK_END)
  FILE_SIZE = fstream.tell()
  fstream.close()
def process_found(pid,array,file,rlock):
  global FILE_SIZE
  global JOB
  global PREFIX
  """
    进程处理
    Args:
      pid:进程编号
      array:进程间共享队列,用于标记各进程所读的文件块结束位置
      file:所读文件名称
    各个进程先从array中获取当前最大的值为起始位置startpossition
    结束的位置endpossition (startpossition+BLOCKSIZE) if (startpossition+BLOCKSIZE)<FILE_SIZE else FILE_SIZE
    if startpossition==FILE_SIZE则进程结束
    if startpossition==0则从0开始读取
    if startpossition!=0为防止行被block截断的情况,先读一行不处理,从下一行开始正式处理
    if 当前位置 <=endpossition 就readline
    否则越过边界,就从新查找array中的最大值
  """
  fstream = open(file,'r')
  while True:
    rlock.acquire()
    print 'pid%s'%pid,','.join([str(v) for v in array])
    startpossition = max(array)      
    endpossition = array[pid] = (startpossition+BLOCKSIZE) if (startpossition+BLOCKSIZE)<FILE_SIZE else FILE_SIZE
    rlock.release()
    if startpossition == FILE_SIZE:#end of the file
      print 'pid%s end'%(pid)
      break
    elif startpossition !=0:
      fstream.seek(startpossition)
      fstream.readline()
    pos = ss = fstream.tell()
    ostream = open('/data/download/tmp_pid'+str(pid)+'_jobs'+str(endpossition),'w')
    while pos<endpossition:
      #处理line
      line = fstream.readline()
      ostream.write(line)
      pos = fstream.tell()
    print 'pid:%s,startposition:%s,endposition:%s,pos:%s'%(pid,ss,pos,pos)
    ostream.flush()
    ostream.close()
    ee = fstream.tell()
  fstream.close()
def main():
  global FILE_SIZE
  print datetime.datetime.now().strftime("%Y/%d/%m %H:%M:%S") 
  file = "/data/pds/download/scmcc_log/tmp_format_2011004.log"
  getFilesize(file)
  print FILE_SIZE
  rlock = RLock()
  array = Array('l',WORKERS,lock=rlock)
  threads=[]
  for i in range(WORKERS):
    p=Process(target=process_found, args=[i,array,file,rlock])
    threads.append(p)
  for i in range(WORKERS):
    threads[i].start()
  for i in range(WORKERS):
    threads[i].join()
  print datetime.datetime.now().strftime("%Y/%d/%m %H:%M:%S") 
if __name__ == '__main__':
  main()

更多关于Python相关内容感兴趣的读者可查看本站专题:《Python字符串操作技巧汇总》、《Python入门与进阶经典教程》及《Python文件与目录操作技巧汇总》

希望本文所述对大家Python程序设计有所帮助。

python socket多线程通讯实例分析(聊天室),pythonsocket

本文实例讲述了python socket多线程通讯方法。分享给大家供大家参考,具体如下:

#!/usr/bin/evn python
"""
这是一个Socket+多进程的例子(聊天服务端)
"""
import socket
import threading
# 处理中文数据用的
encoding = "GBK"
def HKServer(client, addr):
 """
 与客户端时实通讯函数
 加入encoding是为了处理输入中文数据
 client 客户socket
 addr  客户address
 """
 # 通知已有的每个客户,有新的成员加入
 for c in clients: c.send(bytes("[%s]加入rn" % addr[1], encoding))
 # 接受客户端数据
 say = b""
 while True:
  data = client.recv(1024)
  if not data: break
  # 如果不是回车键
  if data != b'rn':
   say += data #.encode(encoding)
   continue
  # 把客户端发来的内容发给所有的客户端
  for c in clients:
   c.send(bytes("[%s]:%srn" % (addr[1], say.decode(encoding)), encoding))
  # 内容归x0
  say = b""
 # 客户离开后,从客户列表中移队当前客户,关闭socket连接
 clients.remove(client)
 client.close()
 # 通知已有的每个客户,有成员离开
 for c in clients: c.send(bytes("[%s]离开rn" % addr[1], encoding))
# 客户端列表
clients = []
# 设置IP地址与端口
HOST = ''
PORT = 9999
# 初始化socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 绑定IP地址与端口
s.bind((HOST, PORT))
# 开始监听
s.listen(1)
# 循环等待
while True:
 # 接受客户
 client, addr = s.accept()
 # 启动新的进程与客户通信
 thread = threading.Thread(target=HKServer, args=(client, addr))
 thread.start()
 # 记录新的客户
 clients.append(client)

希望本文所述对大家Python程序设计有所帮助。

Python实现的数据结构与算法之队列详解,python数据结构与算法

本文实例讲述了Python实现的数据结构与算法之队列。分享给大家供大家参考。具体分析如下:

一、概述

队列(Queue)是一种先进先出(FIFO)的线性数据结构,插入操作在队尾(rear)进行,删除操作在队首(front)进行。

二、ADT

队列ADT(抽象数据类型)一般提供以下接口:

① Queue() 创建队列
② enqueue(item) 向队尾插入项
③ dequeue() 返回队首的项,并从队列中删除该项
④ empty() 判断队列是否为空
⑤ size() 返回队列中项的个数

队列操作的示意图如下:

图片 1

三、Python实现

使用Python的内建类型list列表,可以很方便地实现队列ADT:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
class Queue:
  def __init__(self):
    self.items = []
  def enqueue(self, item):
    self.items.append(item)
  def dequeue(self):
    return self.items.pop(0)
  def empty(self):
    return self.size() == 0
  def size(self):
    return len(self.items)

四、应用

著名的 约瑟夫斯问题(Josephus Problem)是应用队列(确切地说,是循环队列)的典型案例。在 约瑟夫斯问题 中,参与者围成一个圆圈,从某个人(队首)开始报数,报数到n+1的人退出圆圈,然后从退出人的下一位重新开始报数;重复以上动作,直到只剩下一个人为止。

值得注意的是,Queue类只实现了简单队列,上述问题实际上需要用循环队列来解决。在报数过程中,通过“将(从队首)出队的人再入队(到队尾)”来模拟循环队列的行为。具体代码如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
def josephus(namelist, num):
  simqueue = Queue()
  for name in namelist:
    simqueue.enqueue(name)
  while simqueue.size() > 1:
    for i in xrange(num):
      simqueue.enqueue(simqueue.dequeue())
    simqueue.dequeue()
  return simqueue.dequeue()
if __name__ == '__main__':
  print(josephus(["Bill", "David", "Kent", "Jane", "Susan", "Brad"], 3))

运行结果:

$ python josephus.py
Susan

希望本文所述对大家的Python程序设计有所帮助。

本文实例讲述了Python实现的数据结构与算法之队列。分享给大家供大家参考。...

您可能感兴趣的文章:

  • Python多进程机制实例详解
  • Python多进程并发(multiprocessing)用法实例详解
  • 浅析Python中的多进程与多线程的使用
  • python多进程操作实例
  • python 多进程通信模块的简单实现
  • Python3读取文件常用方法实例分析
  • python使用fileinput模块实现逐行读取文件的方法
  • python逐行读取文件内容的三种方法
  • Python linecache.getline()读取文件中特定一行的脚本

本文实例讲述了Python多进程分块读取超大文件的方法。分享给大家供大家参考,具体如下: 读取超...

您可能感兴趣的文章:

  • Python多线程爬虫简单示例
  • Python实现简单多线程任务队列
  • Python 多线程抓取图片效率对比
  • Python多线程、异步+多进程爬虫实现代码
  • 尝试使用Python多线程抓取代理服务器IP地址的示例
  • 基python实现多线程网页爬虫
  • Python实现多线程抓取妹子图
  • Python多线程结合队列下载百度音乐的方法
  • Python实现快速多线程ping的方法
  • Python多线程下载文件的方法
  • 用Python实现一个简单的多线程TCP服务器的教程
  • 浅析Python多线程下的变量问题
  • Python3中多线程编程的队列运作示例
  • 详解Python Socket网络编程
  • python通过socket查询whois的方法
  • python使用socket远程连接错误处理方法

socket多线程通讯实例分析(聊天室),pythonsocket 本文实例讲述了python socket多线程通讯方法。分享给大家供大家参考,具体如下: #!/usr...

本文由10bet手机官网发布于web前端,转载请注明出处:Python多进程分块读取超大文件的方法,python实例详解

上一篇:没有了 下一篇:Centos6.6 安装rsync服务端,centos6.6rsync
猜你喜欢
热门排行
精彩图文