好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Python多进程完成并行处理的方法介绍

本篇文章主要介绍了Python中使用多进程来实现并行处理的方法小结,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
  print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
  print('I (%s) just created a child process (%s).' % (os.getpid(), pid)) 
from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
  print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
  print('Parent process %s.' % os.getpid())
  p = Process(target=run_proc, args=('test',))
  print('Child process will start.')
  p.start()
  p.join()
  print('Child process end.') 
from multiprocessing import Pool
import os, time, random

def long_time_task(name):
  print('Run task %s (%s)...' % (name, os.getpid()))
  start = time.time()
  time.sleep(random.random() * 3)
  end = time.time()
  print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
  print('Parent process %s.' % os.getpid())
  p = Pool(4)
  for i in range(5):
    p.apply_async(long_time_task, args=(i,))
  print('Waiting for all subprocesses done...')
  p.close()
  p.join()
  print('All subprocesses done.') 
import subprocess

print('$ nslookup HdhCmsTestpython.org')
r = subprocess.call(['nslookup', 'HdhCmsTestpython.org'])
print('Exit code:', r) 
import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p测试数据municate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode) 
from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
  print('Process to write: %s' % os.getpid())
  for value in ['A', 'B', 'C']:
    print('Put %s to queue...' % value)
    q.put(value)
    time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
  print('Process to read: %s' % os.getpid())
  while True:
    value = q.get(True)
    print('Get %s from queue.' % value)

if __name__=='__main__':
  # 父进程创建Queue,并传给各个子进程:
  q = Queue()
  pw = Process(target=write, args=(q,))
  pr = Process(target=read, args=(q,))
  # 启动子进程pw,写入:
  pw.start()
  # 启动子进程pr,读取:
  pr.start()
  # 等待pw结束:
  pw.join()
  # pr进程里是死循环,无法等待其结束,只能强行终止:
  pr.terminate() 
import time, threading

# 新线程执行的代码:
def loop():
  print('thread %s is running...' % threading.current_thread().name)
  n = 0
  while n < 5:
    n = n + 1
    print('thread %s >>> %s' % (threading.current_thread().name, n))
    time.sleep(1)
  print('thread %s ended.' % threading.current_thread().name)

print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)
thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended. 
import time, threading
# 假定这是你的银行存款:
balance = 0
def change_it(n):
  # 先存后取,结果应该为0:
  global balance
  balance = balance + n
  balance = balance - n
def run_thread(n):
  for i in range(100000):
    change_it(n)
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance) 
x = balance + n
balance = x 
balance = 0
lock = threading.Lock()

def run_thread(n):
  for i in range(100000):
    # 先要获取锁:
    lock.acquire()
    try:
      # 放心地改吧:
      change_it(n)
    finally:
      # 改完了一定要释放锁:
      lock.release() 
import threading, multiprocessing

def loop():
  x = 0
  while True:
    x = x ^ 1

for i in range(multiprocessing.cpu_count()):
  t = threading.Thread(target=loop)
  t.start() 
import threading

# 创建全局ThreadLocal对象:
local_school = threading.local()

def process_student():
  # 获取当前线程关联的student:
  std = local_school.student
  print('Hello, %s (in %s)' % (std, threading.current_thread().name))

def process_thread(name):
  # 绑定ThreadLocal的student:
  local_school.student = name
  process_student()

t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join() 
import random, time, queue
from multiprocessing.managers import BaseManager

# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()

# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
  pass

# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
  n = random.randint(0, 10000)
  print('Put task %d...' % n)
  task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
  r = result.get(timeout=10)
  print('Result: %s' % r)
# 关闭:
manager.shutdown()
print('master exit.') 
import time, sys, queue
from multiprocessing.managers import BaseManager

# 创建类似的QueueManager:
class QueueManager(BaseManager):
  pass

# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
  try:
    n = task.get(timeout=1)
    print('run task %d * %d...' % (n, n))
    r = '%d * %d = %d' % (n, n, n*n)
    time.sleep(1)
    result.put(r)
  except Queue.Empty:
    print('task queue is empty.')
# 处理结束:
print('worker exit.') 

任务进程要通过网络连接到服务进程,所以要指定服务进程的IP。http://HdhCmsTestarticle/65112.htm

小结

Python的分布式进程接口简单,封装良好,适合需要把繁重任务分布到多台机器的环境下。

注意Queue的作用是用来传递任务和接收结果,每个任务的描述数据量要尽量小。比如发送一个处理日志文件的任务,就不要发送几百兆的日志文件本身,而是发送日志文件存放的完整路径,由Worker进程再去共享的磁盘上读取文件。

以上就是Python多进程完成并行处理的方法介绍的详细内容,更多请关注Gxl网其它相关文章!

查看更多关于Python多进程完成并行处理的方法介绍的详细内容...

  阅读:40次