1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
# Copyright (c) 2011 The Chromium OS Authors.
#
# SPDX-License-Identifier: GPL-2.0+
#
import os
import cros_subprocess
"""Shell command ease-ups for Python."""
class CommandResult:
"""A class which captures the result of executing a command.
Members:
stdout: stdout obtained from command, as a string
stderr: stderr obtained from command, as a string
return_code: Return code from command
exception: Exception received, or None if all ok
"""
def __init__(self):
self.stdout = None
self.stderr = None
self.return_code = None
self.exception = None
def RunPipe(pipe_list, infile=None, outfile=None,
capture=False, capture_stderr=False, oneline=False,
raise_on_error=True, cwd=None, **kwargs):
"""
Perform a command pipeline, with optional input/output filenames.
Args:
pipe_list: List of command lines to execute. Each command line is
piped into the next, and is itself a list of strings. For
example [ ['ls', '.git'] ['wc'] ] will pipe the output of
'ls .git' into 'wc'.
infile: File to provide stdin to the pipeline
outfile: File to store stdout
capture: True to capture output
capture_stderr: True to capture stderr
oneline: True to strip newline chars from output
kwargs: Additional keyword arguments to cros_subprocess.Popen()
Returns:
CommandResult object
"""
result = CommandResult()
last_pipe = None
pipeline = list(pipe_list)
user_pipestr = '|'.join([' '.join(pipe) for pipe in pipe_list])
kwargs['stdout'] = None
kwargs['stderr'] = None
while pipeline:
cmd = pipeline.pop(0)
if last_pipe is not None:
kwargs['stdin'] = last_pipe.stdout
elif infile:
kwargs['stdin'] = open(infile, 'rb')
if pipeline or capture:
kwargs['stdout'] = cros_subprocess.PIPE
elif outfile:
kwargs['stdout'] = open(outfile, 'wb')
if capture_stderr:
kwargs['stderr'] = cros_subprocess.PIPE
try:
last_pipe = cros_subprocess.Popen(cmd, cwd=cwd, **kwargs)
except Exception, err:
result.exception = err
if raise_on_error:
raise Exception("Error running '%s': %s" % (user_pipestr, str))
result.return_code = 255
return result
if capture:
result.stdout, result.stderr, result.combined = (
last_pipe.CommunicateFilter(None))
if result.stdout and oneline:
result.output = result.stdout.rstrip('\r\n')
result.return_code = last_pipe.wait()
else:
result.return_code = os.waitpid(last_pipe.pid, 0)[1]
if raise_on_error and result.return_code:
raise Exception("Error running '%s'" % user_pipestr)
return result
def Output(*cmd):
return RunPipe([cmd], capture=True, raise_on_error=False).stdout
def OutputOneLine(*cmd, **kwargs):
raise_on_error = kwargs.pop('raise_on_error', True)
return (RunPipe([cmd], capture=True, oneline=True,
raise_on_error=raise_on_error,
**kwargs).stdout.strip())
def Run(*cmd, **kwargs):
return RunPipe([cmd], **kwargs).stdout
def RunList(cmd):
return RunPipe([cmd], capture=True).stdout
def StopAll():
cros_subprocess.stay_alive = False
|