Python标准库—subprocess子进程

 2016年12月31日 19:56   Nick王   自动化  开发    1 评论   1002 浏览 

标准库中的subprocess包可以用来fork一个子进程,并运行一个外部的程序。subprocess模块旨在取代一些旧模块的方法:如 os.system、os.spawn*、os.popen*、popen2.*、commands.* 该模块不但可以调用外部的命令作为子进程,而且可以连接到子进程的input/output/error管道,获取相关的返回信息。


subprocess.call()方法

subprocess.call(args, *, stdin=None, stdout=None, stderr=None, shell=False)

父进程等待子进程完成(运行由args指定的命令,直到命令结束后),返回退出信息(returncode,相当于Linux exit code)

>>> import subprocess
>>> returncode = subprocess.call(['uptime'])
……命令结果……
>>> print returncode
0

默认情况下call方法的shell=False,在Linux中shell=False时,call方法使用os.execvp()执行args指定的程序;当shell=True时,如果args是字符串,call方法将直接先运行一个shell,再用这个shell来解释这整个字符串

>>> import subprocess
>>> returncode = subprocess.call('ls -l', shell=True)
……命令结果……
>>> print returncode
0
这两个用法是相同的。

>>> returncode = subprocess.call('exit 127', shell=True)
>>> print returncode
127

shell命令中有一些是shell的内建命令,这些命令必须通过shell运行,$cd。shell=True允许我们运行这样一些命令。


call()方法的源码

def call(*popenargs, **kwargs):
    return Popen(*popenargs, **kwargs).wait()


subprocess.check_call()方法

subprocess.check_call(args, *, stdin=None, stdout=None, stderr=None, shell=False)

父进程等待子进程完成,检查退出信息,如果returncode不为0,则抛出subprocess.CalledProcessError,该对象包含有returncode属性,可用try…except…来检查

>>> subprocess.check_call(['uptime‘])
0

>>> try:
...     subprocess.check_call('exit 127', shell=True)
... except subprocess.CalledProcessError, e:
...     print dir(e)
...     print e.returncode
...     
... 
['__class__', '__delattr__', '__dict__', '__doc__', '__format__', '__getattribute__', '__getitem__', '__getslice__', '__hash__', '__init__', '__module
__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setstate__', '__sizeof__', '__str__', '__subclasshook__', '__unicode__', 
'__weakref__', 'args', 'cmd', 'message', 'output', 'returncode']
127
>>>

check_call()方法源码

def check_call(*popenargs, **kwargs):
    retcode = call(*popenargs, **kwargs)
    if retcode:
        cmd = kwargs.get("args")
        if cmd is None:
            cmd = popenargs[0]
        raise CalledProcessError(retcode, cmd)
    return 0


subprocess.check_ouput()方法

subprocess.check_output(args, *, stdin=None, stderr=None, shell=False, universal_newlines=False)

父进程等待子进程完成,返回子进程向标准输出的输出结果(字符串形式),检查退出信息,如果returncode不为0,则抛出subprocess.CalledProcessError,该对象包含有returncode属性和output属性,output属性为标准输出的输出结果,可用try…except…来检查。

>>> import subprocess
>>> subprocess.check_output(['echo', 'python'])
'python\n'


>>> try:
...     subprocess.check_output('exit 1', shell=True)
... except subprocess.CalledProcessError, e:
...     print dir(e)
...     print e.returncode
...     
... 
['__class__', '__delattr__', '__dict__', '__doc__', '__format__', '__getattribute__', '__getitem__', '__getslice__', '__hash__', '__init__', '__module
__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setstate__', '__sizeof__', '__str__', '__subclasshook__', '__unicode__', 
'__weakref__', 'args', 'cmd', 'message', 'output', 'returncode']
1
>>> 


异常类的output属性和check_output的stderr参数
>>> try:
...     subprocess.check_output(['ls', 'fawefaw'], stderr=subprocess.STDOUT)
... except subprocess.CalledProcessError, e:
...     print e.output
...     
... 
ls: fawefaw: No such file or directory

使用Popen时,用于 stderr 参数的特殊值,表示将标准错误重定向到标准输出的同一个句柄


check_output()方法源码

def check_out(*popenargs, **kwargs):
    if 'stdout' in kwargs:
        raise ValueError('stdout argument not allowed, it will be overridden.')
    process = Popen(stdout=PIPE, *popenargs, **kwargs)
    output, unused_err = process.communicate()
    retcode = process.poll()
    if retcode:
        cmd = kwargs.get("args")
        if cmd is None:
            cmd = popenargs[0]
        raise CalledProcessError(retcode, cmd, output=output)
    return output



subprocess模块中的Popen()类

Popen类可以用来创建进程,并与进程进行复杂得交互。构造函数如下:

__init__(self, args, bufsize=0, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0)
  • args可以是字符串或者序列类型(如:list,元组),用于指定进程的可执行文件及其参数。如果是序列类型,第一个元素通常是可执行文件的路径。也可以显式的使用executeable参数来指定可执行文件的路径。

  • bufsize:指定缓冲。0 无缓冲,1 行缓冲,其他 缓冲区大小,负值 系统缓冲(全缓冲)

  • stdin, stdout, stderr分别表示程序的标准输入、输出、错误句柄。可以是PIPE,文件描述符或文件对象,也可以设置为None,表示从父进程继承。

  • preexec_fn只在Unix平台下有效,用于指定一个可执行对象(callable object),它将在子进程运行之前被调用。

  • Close_sfs:在windows平台下,如果close_fds被设置为True,则新创建的子进程将不会继承父进程的输入、输出、错误管道。

  • shell设为true,程序将通过shell来执行。

  • cwd用于设置子进程的当前目录

  • env是字典类型,用于指定子进程的环境变量。如果env = None,子进程的环境变量将从父进程中继承。

  • Universal_newlines:不同操作系统下,文本的换行符是不一样的。如:windows下用'/r/n'表示换,而Linux下用'/n'。如果将此参数设置为True,Python统一把这些换行符当作'/n'来处理

  • startupinfo与createionflags只在windows下有效,它们将被传递给底层的CreateProcess()函数,用于设置子进程的一些属性,如:主窗口的外观,进程的优先级等等


Popen()类的方法

  • Popen.poll():用于检查子进程是否已经结束。设置并返回returncode属性

  • Popen.wait():等待子进程结束。设置并返回returncode属性

  • Popen.communicate(input=None):与子进程进行交互。向stdin发送数据,或从stdout和stderr中读取数据。可选参数input指定发送到子进程的参数。Communicate()返回一个元组:(stdoutdata, stderrdata)。注意:如果希望通过进程的stdin向其发送数据,在创建Popen对象的时候,参数stdin必须被设置为PIPE。同样,如果希望从stdout和stderr获取数据,必须将stdout和stderr设置为PIPE。会阻塞父进程,直到子进程完成。

  • Popen.send_signal(signal):向子进程发送信号。

  • Popen.terminate():停止(stop)子进程

  • Popen.kill():杀死子进程


Popen()类的属性

  • Popen.stdin:如果在创建Popen对象是,参数stdin被设置为PIPE,Popen.stdin将返回一个文件对象用于策子进程发送指令。否则返回None。

  • Popen.stdout:如果在创建Popen对象是,参数stdout被设置为PIPE,Popen.stdout将返回一个文件对象用于策子进程发送指令。否则返回None。

  • Popen.stderr:如果在创建Popen对象是,参数stdout被设置为PIPE,Popen.stdout将返回一个文件对象用于策子进程发送指令。否则返回None。

  • Popen.pid:获取子进程的进程ID。

  • Popen.returncode:获取进程的返回值。如果进程还没有结束,返回None。


Popen对象创建后,主程序不会自动等待子进程完成。必须调用对象的wait()方法,父进程才会阻塞

$ cat sub.py 
#!/usr/bin/env python
#coding: utf-8

import subprocess
child = subprocess.Popen(['ping', '-c', '3', 'www.taobao.com'])
print 'master process'

$ python sub.py 
master process
64 bytes from 220.181.141.252: icmp_seq=0 ttl=33 time=4.135 ms
……省略……



阻塞的情况
$ cat sub.py 
#!/usr/bin/env python
#coding: utf-8

import subprocess
child = subprocess.Popen(['ping', '-c', '3', 'www.taobao.com'])
print child.wait()
print 'master process'

$ python sub.py 
PING www.taobao.com.danuoyi.tbcache.com (220.181.141.252): 56 data bytes
64 bytes from 220.181.141.252: icmp_seq=0 ttl=33 time=24.555 ms
……省略……
round-trip min/avg/max/stddev = 4.986/13.004/24.979/9.606 ms
0
master process


使用Popen类的communicate方法获取程序运行结果,communicate方法默认阻塞

>>> import subprocess
>>> res = subprocess.Popen('ls -l', shell=True, stdout=subprocess.PIPE)
>>> output, errdata = res.communicate()
>>> retcode = res.poll()
>>> print retcode
0
>>> print output
total 56
-rw-r--r--  1 wanghonggang  staff  263 10 27 14:58 bu.py
-rw-r--r--  1 wanghonggang  staff   82 10 27 12:02 ex.py
-rw-r--r--  1 wanghonggang  staff  212 10 27 15:35 ex1.py
-rw-r--r--  1 wanghonggang  staff  176 10 27 13:55 ex2.py
-rw-r--r--  1 wanghonggang  staff  285 10 27 15:54 exec.py
-rw-r--r--  1 wanghonggang  staff  181 10 27 11:37 std.py
-rw-r--r--  1 wanghonggang  staff  162 10 30 13:21 sub.py

>>> print errdata
None
>>> 
接下来再看subprocess.check_output()的源码


进程间通信,使用communicate()方法给子进程的stdin发送数据

$ cat sub.py 
#!/usr/bin/env python
#coding: utf-8

import subprocess
child = subprocess.Popen(['cat'], stdin=subprocess.PIPE)
output, errdata = child.communicate('test stdin')
retcode = child.poll()
print retcode
print output
print errdata


数据流管道

>>> import subprocess
>>> ch1 = subprocess.Popen('ls -l', shell=True, stdout=subprocess.PIPE)
>>> ch2 = subprocess.Popen('wc', shell=True, stdin=ch1.stdout, stdout=subprocess.PIPE)
>>> out, err = ch2.communicate()  ##也可以使用ch2.stdout.read()或者readline()来依次读取
>>> print out
       7      56     362
subprocess.PIPE实际上为文本流提供一个缓存区。ch1的stdout将文本输出到缓存区,随后ch2的stdin从该PIPE中将文本读取走。ch2的输出文本也被存放在PIPE中,直到communicate()方法从PIPE中读取出PIPE中的文本。


按行读取

$ cat sub.py 
#!/usr/bin/env python
#coding: utf-8

import subprocess
child = subprocess.Popen(['ls', '-l'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while True:
    buff = child.stdout.readline()
    print '==>', buff,
    if buff == '' and child.poll() != None:
        break


线程化的ping扫描(摘自Python Unix和Linux系统管理指南)

#!/usr/bin/env python
#coding: utf-8

import subprocess
from Queue import Queue
from threading import Thread

thread_num = 5
queue = Queue()
ips = ['10.111.32.99', '10.111.23.176', '10.111.25.246', '10.111.32.81', '10.111.32.82', '10.111.32.89', '10.111.11.11']

def pinger(i, q):
    while True:
        ip = q.get()
        print 'Thread %s: Ping %s' % (i, ip)
        ret = subprocess.call('ping -c 1 %s' % ip, shell=True, stdout=open('/dev/null', 'w'), stderr=subprocess.STDOUT)
        if ret == 0:
            print '%s: is alive' % ip
        else:
            print '%s: did not respond' % ip
        q.task_done()#在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号

for i in range(thread_num):
    worker = Thread(target=pinger, args=(i, queue))
    worker.setDaemon(True)#使主线程变成守护模式,当主线程结束时,子线程结束而且不管是否执行完毕
    worker.start()

for ip in ips:
    queue.put(ip)

print "Main Thread Waiting"
queue.join() #等待队列为空,再执行别的操作
print "Done."
#join是防止主线程在其他线程获得机会完成队列中的任务之前,从程序中退出。

$ python sub.py 
Thread 0: Ping 10.111.32.99
Thread 1: Ping 10.111.23.176
Thread 2: Ping 10.111.25.246
 Thread 3: Ping 10.111.32.81
Main Thread Waiting
Thread 4: Ping 10.111.32.82
10.111.25.246: is alive
Thread 2: Ping 10.111.32.89
10.111.32.82: is alive
Thread 4: Ping 10.111.11.11
10.111.32.81: is alive
10.111.32.99: is alive
10.111.23.176: is alive
10.111.32.89: is alive
10.111.11.11: did not respond
Done.

多队列与多线程池实现通过IP获取MAC地址(Python Unix和Linux系统管理指南第十章更多精彩例子)

 cat sub.py 
#!/usr/bin/env python
#coding: utf-8

import subprocess
from Queue import Queue
from threading import Thread
import re

ping_thread_num = 5
arp_thread_num = 5
in_queue = Queue()
out_queue = Queue()
ips = ['10.111.32.99', '10.111.23.176', '10.111.25.246', '10.111.32.81', '10.111.32.82', '10.111.32.89', '10.111.11.11']

def pinger(i, iq, oq):
    while True:
        ip = iq.get()
        print 'Thread %s: Ping %s' % (i, ip)
        ret = subprocess.call('ping -c 1 %s' % ip, shell=True, stdout=open('/dev/null', 'w'), stderr=subprocess.STDOUT)
        if ret == 0:
            oq.put(ip)
        else:
            print '%s: did not respond' % ip
        iq.task_done()
def arping(i, oq):
    while True:
        ip = oq.get()
        p = subprocess.Popen('arping -c 1 %s' % ip, shell=True, stdout=subprocess.STDOUT)
        out = p.stdout.read()
        result = out.split()
        pattern = re.compile(":")
        macaddr = None
        for item in result:
            if re.search(pattern, item):
                macaddr = item
        print 'IP Address: %s | MAC Address: %s' % (ip, macaddr)
        oq.task_done()

for ip in ips:
    in_queue.put(ip)

for i in range(ping_thread_num):
    worker = Thread(target=pinger, args=(i, in_queue, out_queue))
    worker.setDaemon(True)
    worker.start()

for i in range(arp_thread_num):
    worker = Thread(target=arping, args=(i, out_queue))
    worker.setDaemon(True)
    worker.start

print "Main Thread Waiting"
in_queue.join()
out_queue.join()
print "Done."


ansible中run_command方法(针对subprocess.Popen的封装)源码注释

def run_command(self, args, check_rc=False, close_fds=True, executable=None, data=None, binary_data=False, path_prefix=None, cwd=None, use_unsafe_shell=False, prompt_regex=None, environ_update=None, umask=None, encoding='utf-8', errors='surrogate_or_strict'):
        shell = False
        text_type = str
        binary_type = bytes
        if isinstance(args, list): #如果args为列表,并且use_unsafe_shell为True,就把args转换为string,那么设置shell为True,否则shell为False
            if use_unsafe_shell:
                args = " ".join([pipes.quote(x) for x in args])
                shell = True
        elif isinstance(args, (binary_type, text_type)) and use_unsafe_shell: #如果args为string并且use_unsafe_shell为True,那么设置shell为True
            shell = True
        elif isinstance(args, (binary_type, text_type)): #如果args为字符串,将args转换为列表
            if PY2:
                args = to_bytes(args, errors='surrogate_or_strict')
            elif PY3:
                args = to_text(args, errors='surrogateescape')
            args = shlex.split(args)
        else: #如果不是list活着string则抛出异常
            msg = "Argument 'args' to run_command must be list or string"
            self.fail_json(rc=257, cmd=args, msg=msg)

        prompt_re = None
        if prompt_regex:
            if isinstance(prompt_regex, text_type):
                if PY3:
                    prompt_regex = to_bytes(prompt_regex, errors='surrogateescape')
                elif PY2:
                    prompt_regex = to_bytes(prompt_regex, errors='surrogate_or_strict')
            try:
                prompt_re = re.compile(prompt_regex, re.MULTILINE)
            except re.error:
                self.fail_json(msg="invalid prompt regular expression given to run_command")

        # expand things like $HOME and ~
        if not shell: #当shell 为False的时候解析args中的变量(os.path.expandvar(path)根据环境变量的值替换path中的变量)
            args = [ os.path.expanduser(os.path.expandvars(x)) for x in args if x is not None ]

        rc = 0
        msg = None
        st_in = None

        # Manipulate the environ we'll send to the new process 处理环境变量
        old_env_vals = {}
        # We can set this from both an attribute and per call
        for key, val in self.run_command_environ_update.items():
            old_env_vals[key] = os.environ.get(key, None)
            os.environ[key] = val
        if environ_update:  #如果定义新的环境变量的值,则用新的来更新
            for key, val in environ_update.items():
                old_env_vals[key] = os.environ.get(key, None)
                os.environ[key] = val
        if path_prefix:
            old_env_vals['PATH'] = os.environ['PATH']
            os.environ['PATH'] = "%s:%s" % (path_prefix, os.environ['PATH'])

        # Clean out python paths set by ansiballz
        if 'PYTHONPATH' in os.environ:
            pypaths = os.environ['PYTHONPATH'].split(':')
            pypaths = [x for x in pypaths \
                        if not x.endswith('/ansible_modlib.zip') \
                        and not x.endswith('/debug_dir')]
            os.environ['PYTHONPATH'] = ':'.join(pypaths)
            if not os.environ['PYTHONPATH']:
                del os.environ['PYTHONPATH']

        #处理args,处理字符集相关
        to_clean_args = args
        if PY2:
            if isinstance(args, text_type):
                to_clean_args = to_bytes(args)
        else:
            if isinstance(args, binary_type):
                to_clean_args = to_text(args)
        if isinstance(args, (text_type, binary_type)):
            to_clean_args = shlex.split(to_clean_args)

        #处理args中密码相关
        clean_args = []
        is_passwd = False
        for arg in to_clean_args:
            if is_passwd:
                is_passwd = False
                clean_args.append('********')
                continue
            if PASSWD_ARG_RE.match(arg):
                sep_idx = arg.find('=')
                if sep_idx > -1:
                    clean_args.append('%s=********' % arg[:sep_idx])
                    continue
                else:
                    is_passwd = True
            arg = heuristic_log_sanitize(arg, self.no_log_values)
            clean_args.append(arg)
        clean_args = ' '.join(pipes.quote(arg) for arg in clean_args)

        if data:#data指定的信息会被写入到命令的标准输入
            st_in = subprocess.PIPE

        kwargs = dict(
            executable=executable,
            shell=shell,
            close_fds=close_fds,
            stdin=st_in,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )

        if cwd and os.path.isdir(cwd): #指定工作目录
            kwargs['cwd'] = cwd

        # store the pwd  #存储当前工作路径
        prev_dir = os.getcwd()

        # 确保进入到正确的工作目录
        if cwd and os.path.isdir(cwd):
            try:
                os.chdir(cwd)
            except (OSError, IOError):
                e = get_exception()
                self.fail_json(rc=e.errno, msg="Could not open %s, %s" % (cwd, str(e)))

        #运行命令时的权限掩码
        old_umask = None
        if umask:
            old_umask = os.umask(umask)

        try:
            if self._debug:
                self.log('Executing: ' + clean_args)
            cmd = subprocess.Popen(args, **kwargs) #看这里~

            # the communication logic here is essentially taken from that
            # of the _communicate() function in ssh.py

            stdout = b('')
            stderr = b('')
            rpipes = [cmd.stdout, cmd.stderr]

            if data: #如果存在data信息
                if not binary_data: #并且binary_data为False,那么每输出一行到新行
                    data += '\n'
                if isinstance(data, text_type):
                    data = to_bytes(data)
                cmd.stdin.write(data) #将data信息写入到stdin
                cmd.stdin.close()

            while True: #使用select的IO模型读取命令执行结果
                rfd, wfd, efd = select.select(rpipes, [], rpipes, 1)
                if cmd.stdout in rfd:
                    dat = os.read(cmd.stdout.fileno(), 9000)
                    stdout += dat
                    if dat == b(''):
                        rpipes.remove(cmd.stdout)
                if cmd.stderr in rfd:
                    dat = os.read(cmd.stderr.fileno(), 9000)
                    stderr += dat
                    if dat == b(''):
                        rpipes.remove(cmd.stderr)
                if prompt_re:
                    if prompt_re.search(stdout) and not data:
                        if encoding:
                            stdout = to_native(stdout, encoding=encoding, errors=errors)
                        else:
                            stdout = stdout
                        return (257, stdout, "A prompt was encountered while running a command, but no input data was specified")
                if (not rpipes or not rfd) and cmd.poll() is not None:
                    break
                elif not rpipes and cmd.poll() == None:
                    cmd.wait()
                    break

            cmd.stdout.close() #关闭文件对象
            cmd.stderr.close()

            rc = cmd.returncode
        except (OSError, IOError):
            e = get_exception()
            self.fail_json(rc=e.errno, msg=to_native(e), cmd=clean_args)
        except Exception:
            e = get_exception()
            self.fail_json(rc=257, msg=to_native(e), exception=traceback.format_exc(), cmd=clean_args)

        # 恢复环境变量
        for key, val in old_env_vals.items():
            if val is None:
                del os.environ[key]
            else:
                os.environ[key] = val

        if old_umask:#恢复原来的权限掩码
            os.umask(old_umask)

        if rc != 0 and check_rc:
            msg = heuristic_log_sanitize(stderr.rstrip(), self.no_log_values)
            self.fail_json(cmd=clean_args, rc=rc, stdout=stdout, stderr=stderr, msg=msg)

        # 回到原来的工作路径
        os.chdir(prev_dir)

        if encoding is not None:
            return (rc, to_native(stdout, encoding=encoding, errors=errors),
                    to_native(stderr, encoding=encoding, errors=errors))
        return (rc, stdout, stderr)


如无特殊说明,文章均为本站原创,转载请注明出处