Show More
Commit Description:
merge with algo and add brython files that were missing
Commit Description:
merge with algo and add brython files that were missing
References:
File last commit:
Show/Diff file:
Action:
lib/assets/Lib/os.py
| 1041 lines
| 35.8 KiB
| text/x-python
| PythonLexer
|
r584 | r"""OS routines for Mac, NT, or Posix depending on what system we're on. | |||
This exports: | ||||
- all functions from posix, nt, os2, or ce, e.g. unlink, stat, etc. | ||||
- os.path is either posixpath or ntpath | ||||
- os.name is either 'posix', 'nt', 'os2' or 'ce'. | ||||
- os.curdir is a string representing the current directory ('.' or ':') | ||||
- os.pardir is a string representing the parent directory ('..' or '::') | ||||
- os.sep is the (or a most common) pathname separator ('/' or ':' or '\\') | ||||
- os.extsep is the extension separator (always '.') | ||||
- os.altsep is the alternate pathname separator (None or '/') | ||||
- os.pathsep is the component separator used in $PATH etc | ||||
- os.linesep is the line separator in text files ('\r' or '\n' or '\r\n') | ||||
- os.defpath is the default search path for executables | ||||
- os.devnull is the file path of the null device ('/dev/null', etc.) | ||||
Programs that import and use 'os' stand a better chance of being | ||||
portable between different platforms. Of course, they must then | ||||
only use functions that are defined by all platforms (e.g., unlink | ||||
and opendir), and leave all pathname manipulation to os.path | ||||
(e.g., split and join). | ||||
""" | ||||
import sys, errno | ||||
import stat as st | ||||
_names = sys.builtin_module_names | ||||
# Note: more names are added to __all__ later. | ||||
__all__ = ["altsep", "curdir", "pardir", "sep", "pathsep", "linesep", | ||||
"defpath", "name", "path", "devnull", "SEEK_SET", "SEEK_CUR", | ||||
"SEEK_END", "fsencode", "fsdecode", "get_exec_path", "fdopen", | ||||
"popen", "extsep"] | ||||
def _exists(name): | ||||
return name in globals() | ||||
def _get_exports_list(module): | ||||
try: | ||||
return list(module.__all__) | ||||
except AttributeError: | ||||
return [n for n in dir(module) if n[0] != '_'] | ||||
# Any new dependencies of the os module and/or changes in path separator | ||||
# requires updating importlib as well. | ||||
if 'posix' in _names: | ||||
name = 'posix' | ||||
linesep = '\n' | ||||
from posix import * | ||||
try: | ||||
from posix import _exit | ||||
__all__.append('_exit') | ||||
except ImportError: | ||||
pass | ||||
import posixpath as path | ||||
try: | ||||
from posix import _have_functions | ||||
except ImportError: | ||||
pass | ||||
elif 'nt' in _names: | ||||
name = 'nt' | ||||
linesep = '\r\n' | ||||
from nt import * | ||||
try: | ||||
from nt import _exit | ||||
__all__.append('_exit') | ||||
except ImportError: | ||||
pass | ||||
import ntpath as path | ||||
import nt | ||||
__all__.extend(_get_exports_list(nt)) | ||||
del nt | ||||
try: | ||||
from nt import _have_functions | ||||
except ImportError: | ||||
pass | ||||
elif 'os2' in _names: | ||||
name = 'os2' | ||||
linesep = '\r\n' | ||||
from os2 import * | ||||
try: | ||||
from os2 import _exit | ||||
__all__.append('_exit') | ||||
except ImportError: | ||||
pass | ||||
if sys.version.find('EMX GCC') == -1: | ||||
import ntpath as path | ||||
else: | ||||
import os2emxpath as path | ||||
from _emx_link import link | ||||
import os2 | ||||
__all__.extend(_get_exports_list(os2)) | ||||
del os2 | ||||
try: | ||||
from os2 import _have_functions | ||||
except ImportError: | ||||
pass | ||||
elif 'ce' in _names: | ||||
name = 'ce' | ||||
linesep = '\r\n' | ||||
from ce import * | ||||
try: | ||||
from ce import _exit | ||||
__all__.append('_exit') | ||||
except ImportError: | ||||
pass | ||||
# We can use the standard Windows path. | ||||
import ntpath as path | ||||
import ce | ||||
__all__.extend(_get_exports_list(ce)) | ||||
del ce | ||||
try: | ||||
from ce import _have_functions | ||||
except ImportError: | ||||
pass | ||||
else: | ||||
raise ImportError('no os specific module found') | ||||
sys.modules['os.path'] = path | ||||
from os.path import (curdir, pardir, sep, pathsep, defpath, extsep, altsep, | ||||
devnull) | ||||
del _names | ||||
if _exists("_have_functions"): | ||||
_globals = globals() | ||||
def _add(str, fn): | ||||
if (fn in _globals) and (str in _have_functions): | ||||
_set.add(_globals[fn]) | ||||
_set = set() | ||||
_add("HAVE_FACCESSAT", "access") | ||||
_add("HAVE_FCHMODAT", "chmod") | ||||
_add("HAVE_FCHOWNAT", "chown") | ||||
_add("HAVE_FSTATAT", "stat") | ||||
_add("HAVE_FUTIMESAT", "utime") | ||||
_add("HAVE_LINKAT", "link") | ||||
_add("HAVE_MKDIRAT", "mkdir") | ||||
_add("HAVE_MKFIFOAT", "mkfifo") | ||||
_add("HAVE_MKNODAT", "mknod") | ||||
_add("HAVE_OPENAT", "open") | ||||
_add("HAVE_READLINKAT", "readlink") | ||||
_add("HAVE_RENAMEAT", "rename") | ||||
_add("HAVE_SYMLINKAT", "symlink") | ||||
_add("HAVE_UNLINKAT", "unlink") | ||||
_add("HAVE_UNLINKAT", "rmdir") | ||||
_add("HAVE_UTIMENSAT", "utime") | ||||
supports_dir_fd = _set | ||||
_set = set() | ||||
_add("HAVE_FACCESSAT", "access") | ||||
supports_effective_ids = _set | ||||
_set = set() | ||||
_add("HAVE_FCHDIR", "chdir") | ||||
_add("HAVE_FCHMOD", "chmod") | ||||
_add("HAVE_FCHOWN", "chown") | ||||
_add("HAVE_FDOPENDIR", "listdir") | ||||
_add("HAVE_FEXECVE", "execve") | ||||
_set.add(stat) # fstat always works | ||||
_add("HAVE_FTRUNCATE", "truncate") | ||||
_add("HAVE_FUTIMENS", "utime") | ||||
_add("HAVE_FUTIMES", "utime") | ||||
_add("HAVE_FPATHCONF", "pathconf") | ||||
if _exists("statvfs") and _exists("fstatvfs"): # mac os x10.3 | ||||
_add("HAVE_FSTATVFS", "statvfs") | ||||
supports_fd = _set | ||||
_set = set() | ||||
_add("HAVE_FACCESSAT", "access") | ||||
# Some platforms don't support lchmod(). Often the function exists | ||||
# anyway, as a stub that always returns ENOSUP or perhaps EOPNOTSUPP. | ||||
# (No, I don't know why that's a good design.) ./configure will detect | ||||
# this and reject it--so HAVE_LCHMOD still won't be defined on such | ||||
# platforms. This is Very Helpful. | ||||
# | ||||
# However, sometimes platforms without a working lchmod() *do* have | ||||
# fchmodat(). (Examples: Linux kernel 3.2 with glibc 2.15, | ||||
# OpenIndiana 3.x.) And fchmodat() has a flag that theoretically makes | ||||
# it behave like lchmod(). So in theory it would be a suitable | ||||
# replacement for lchmod(). But when lchmod() doesn't work, fchmodat()'s | ||||
# flag doesn't work *either*. Sadly ./configure isn't sophisticated | ||||
# enough to detect this condition--it only determines whether or not | ||||
# fchmodat() minimally works. | ||||
# | ||||
# Therefore we simply ignore fchmodat() when deciding whether or not | ||||
# os.chmod supports follow_symlinks. Just checking lchmod() is | ||||
# sufficient. After all--if you have a working fchmodat(), your | ||||
# lchmod() almost certainly works too. | ||||
# | ||||
# _add("HAVE_FCHMODAT", "chmod") | ||||
_add("HAVE_FCHOWNAT", "chown") | ||||
_add("HAVE_FSTATAT", "stat") | ||||
_add("HAVE_LCHFLAGS", "chflags") | ||||
_add("HAVE_LCHMOD", "chmod") | ||||
if _exists("lchown"): # mac os x10.3 | ||||
_add("HAVE_LCHOWN", "chown") | ||||
_add("HAVE_LINKAT", "link") | ||||
_add("HAVE_LUTIMES", "utime") | ||||
_add("HAVE_LSTAT", "stat") | ||||
_add("HAVE_FSTATAT", "stat") | ||||
_add("HAVE_UTIMENSAT", "utime") | ||||
_add("MS_WINDOWS", "stat") | ||||
supports_follow_symlinks = _set | ||||
del _set | ||||
del _have_functions | ||||
del _globals | ||||
del _add | ||||
# Python uses fixed values for the SEEK_ constants; they are mapped | ||||
# to native constants if necessary in posixmodule.c | ||||
# Other possible SEEK values are directly imported from posixmodule.c | ||||
SEEK_SET = 0 | ||||
SEEK_CUR = 1 | ||||
SEEK_END = 2 | ||||
def _get_masked_mode(mode): | ||||
mask = umask(0) | ||||
umask(mask) | ||||
return mode & ~mask | ||||
# Super directory utilities. | ||||
# (Inspired by Eric Raymond; the doc strings are mostly his) | ||||
def makedirs(name, mode=0o777, exist_ok=False): | ||||
"""makedirs(path [, mode=0o777][, exist_ok=False]) | ||||
Super-mkdir; create a leaf directory and all intermediate ones. | ||||
Works like mkdir, except that any intermediate path segment (not | ||||
just the rightmost) will be created if it does not exist. If the | ||||
target directory with the same mode as we specified already exists, | ||||
raises an OSError if exist_ok is False, otherwise no exception is | ||||
raised. This is recursive. | ||||
""" | ||||
head, tail = path.split(name) | ||||
if not tail: | ||||
head, tail = path.split(head) | ||||
if head and tail and not path.exists(head): | ||||
try: | ||||
makedirs(head, mode, exist_ok) | ||||
except OSError as e: | ||||
# be happy if someone already created the path | ||||
if e.errno != errno.EEXIST: | ||||
raise | ||||
cdir = curdir | ||||
if isinstance(tail, bytes): | ||||
cdir = bytes(curdir, 'ASCII') | ||||
if tail == cdir: # xxx/newdir/. exists if xxx/newdir exists | ||||
return | ||||
try: | ||||
mkdir(name, mode) | ||||
except OSError as e: | ||||
dir_exists = path.isdir(name) | ||||
expected_mode = _get_masked_mode(mode) | ||||
if dir_exists: | ||||
# S_ISGID is automatically copied by the OS from parent to child | ||||
# directories on mkdir. Don't consider it being set to be a mode | ||||
# mismatch as mkdir does not unset it when not specified in mode. | ||||
actual_mode = st.S_IMODE(lstat(name).st_mode) & ~st.S_ISGID | ||||
else: | ||||
actual_mode = -1 | ||||
if not (e.errno == errno.EEXIST and exist_ok and dir_exists and | ||||
actual_mode == expected_mode): | ||||
if dir_exists and actual_mode != expected_mode: | ||||
e.strerror += ' (mode %o != expected mode %o)' % ( | ||||
actual_mode, expected_mode) | ||||
raise | ||||
def removedirs(name): | ||||
"""removedirs(path) | ||||
Super-rmdir; remove a leaf directory and all empty intermediate | ||||
ones. Works like rmdir except that, if the leaf directory is | ||||
successfully removed, directories corresponding to rightmost path | ||||
segments will be pruned away until either the whole path is | ||||
consumed or an error occurs. Errors during this latter phase are | ||||
ignored -- they generally mean that a directory was not empty. | ||||
""" | ||||
rmdir(name) | ||||
head, tail = path.split(name) | ||||
if not tail: | ||||
head, tail = path.split(head) | ||||
while head and tail: | ||||
try: | ||||
rmdir(head) | ||||
except error: | ||||
break | ||||
head, tail = path.split(head) | ||||
def renames(old, new): | ||||
"""renames(old, new) | ||||
Super-rename; create directories as necessary and delete any left | ||||
empty. Works like rename, except creation of any intermediate | ||||
directories needed to make the new pathname good is attempted | ||||
first. After the rename, directories corresponding to rightmost | ||||
path segments of the old name will be pruned way until either the | ||||
whole path is consumed or a nonempty directory is found. | ||||
Note: this function can fail with the new directory structure made | ||||
if you lack permissions needed to unlink the leaf directory or | ||||
file. | ||||
""" | ||||
head, tail = path.split(new) | ||||
if head and tail and not path.exists(head): | ||||
makedirs(head) | ||||
rename(old, new) | ||||
head, tail = path.split(old) | ||||
if head and tail: | ||||
try: | ||||
removedirs(head) | ||||
except error: | ||||
pass | ||||
__all__.extend(["makedirs", "removedirs", "renames"]) | ||||
def walk(top, topdown=True, onerror=None, followlinks=False): | ||||
"""Directory tree generator. | ||||
For each directory in the directory tree rooted at top (including top | ||||
itself, but excluding '.' and '..'), yields a 3-tuple | ||||
dirpath, dirnames, filenames | ||||
dirpath is a string, the path to the directory. dirnames is a list of | ||||
the names of the subdirectories in dirpath (excluding '.' and '..'). | ||||
filenames is a list of the names of the non-directory files in dirpath. | ||||
Note that the names in the lists are just names, with no path components. | ||||
To get a full path (which begins with top) to a file or directory in | ||||
dirpath, do os.path.join(dirpath, name). | ||||
If optional arg 'topdown' is true or not specified, the triple for a | ||||
directory is generated before the triples for any of its subdirectories | ||||
(directories are generated top down). If topdown is false, the triple | ||||
for a directory is generated after the triples for all of its | ||||
subdirectories (directories are generated bottom up). | ||||
When topdown is true, the caller can modify the dirnames list in-place | ||||
(e.g., via del or slice assignment), and walk will only recurse into the | ||||
subdirectories whose names remain in dirnames; this can be used to prune | ||||
the search, or to impose a specific order of visiting. Modifying | ||||
dirnames when topdown is false is ineffective, since the directories in | ||||
dirnames have already been generated by the time dirnames itself is | ||||
generated. | ||||
By default errors from the os.listdir() call are ignored. If | ||||
optional arg 'onerror' is specified, it should be a function; it | ||||
will be called with one argument, an os.error instance. It can | ||||
report the error to continue with the walk, or raise the exception | ||||
to abort the walk. Note that the filename is available as the | ||||
filename attribute of the exception object. | ||||
By default, os.walk does not follow symbolic links to subdirectories on | ||||
systems that support them. In order to get this functionality, set the | ||||
optional argument 'followlinks' to true. | ||||
Caution: if you pass a relative pathname for top, don't change the | ||||
current working directory between resumptions of walk. walk never | ||||
changes the current directory, and assumes that the client doesn't | ||||
either. | ||||
Example: | ||||
import os | ||||
from os.path import join, getsize | ||||
for root, dirs, files in os.walk('python/Lib/email'): | ||||
print(root, "consumes", end="") | ||||
print(sum([getsize(join(root, name)) for name in files]), end="") | ||||
print("bytes in", len(files), "non-directory files") | ||||
if 'CVS' in dirs: | ||||
dirs.remove('CVS') # don't visit CVS directories | ||||
""" | ||||
islink, join, isdir = path.islink, path.join, path.isdir | ||||
# We may not have read permission for top, in which case we can't | ||||
# get a list of the files the directory contains. os.walk | ||||
# always suppressed the exception then, rather than blow up for a | ||||
# minor reason when (say) a thousand readable directories are still | ||||
# left to visit. That logic is copied here. | ||||
try: | ||||
# Note that listdir and error are globals in this module due | ||||
# to earlier import-*. | ||||
names = listdir(top) | ||||
except error as err: | ||||
if onerror is not None: | ||||
onerror(err) | ||||
return | ||||
dirs, nondirs = [], [] | ||||
for name in names: | ||||
if isdir(join(top, name)): | ||||
dirs.append(name) | ||||
else: | ||||
nondirs.append(name) | ||||
if topdown: | ||||
yield top, dirs, nondirs | ||||
for name in dirs: | ||||
new_path = join(top, name) | ||||
if followlinks or not islink(new_path): | ||||
yield from walk(new_path, topdown, onerror, followlinks) | ||||
if not topdown: | ||||
yield top, dirs, nondirs | ||||
__all__.append("walk") | ||||
if {open, stat} <= supports_dir_fd and {listdir, stat} <= supports_fd: | ||||
def fwalk(top=".", topdown=True, onerror=None, *, follow_symlinks=False, dir_fd=None): | ||||
"""Directory tree generator. | ||||
This behaves exactly like walk(), except that it yields a 4-tuple | ||||
dirpath, dirnames, filenames, dirfd | ||||
`dirpath`, `dirnames` and `filenames` are identical to walk() output, | ||||
and `dirfd` is a file descriptor referring to the directory `dirpath`. | ||||
The advantage of fwalk() over walk() is that it's safe against symlink | ||||
races (when follow_symlinks is False). | ||||
If dir_fd is not None, it should be a file descriptor open to a directory, | ||||
and top should be relative; top will then be relative to that directory. | ||||
(dir_fd is always supported for fwalk.) | ||||
Caution: | ||||
Since fwalk() yields file descriptors, those are only valid until the | ||||
next iteration step, so you should dup() them if you want to keep them | ||||
for a longer period. | ||||
Example: | ||||
import os | ||||
for root, dirs, files, rootfd in os.fwalk('python/Lib/email'): | ||||
print(root, "consumes", end="") | ||||
print(sum([os.stat(name, dir_fd=rootfd).st_size for name in files]), | ||||
end="") | ||||
print("bytes in", len(files), "non-directory files") | ||||
if 'CVS' in dirs: | ||||
dirs.remove('CVS') # don't visit CVS directories | ||||
""" | ||||
# Note: To guard against symlink races, we use the standard | ||||
# lstat()/open()/fstat() trick. | ||||
orig_st = stat(top, follow_symlinks=False, dir_fd=dir_fd) | ||||
topfd = open(top, O_RDONLY, dir_fd=dir_fd) | ||||
try: | ||||
if (follow_symlinks or (st.S_ISDIR(orig_st.st_mode) and | ||||
path.samestat(orig_st, stat(topfd)))): | ||||
yield from _fwalk(topfd, top, topdown, onerror, follow_symlinks) | ||||
finally: | ||||
close(topfd) | ||||
def _fwalk(topfd, toppath, topdown, onerror, follow_symlinks): | ||||
# Note: This uses O(depth of the directory tree) file descriptors: if | ||||
# necessary, it can be adapted to only require O(1) FDs, see issue | ||||
# #13734. | ||||
names = listdir(topfd) | ||||
dirs, nondirs = [], [] | ||||
for name in names: | ||||
try: | ||||
# Here, we don't use AT_SYMLINK_NOFOLLOW to be consistent with | ||||
# walk() which reports symlinks to directories as directories. | ||||
# We do however check for symlinks before recursing into | ||||
# a subdirectory. | ||||
if st.S_ISDIR(stat(name, dir_fd=topfd).st_mode): | ||||
dirs.append(name) | ||||
else: | ||||
nondirs.append(name) | ||||
except FileNotFoundError: | ||||
try: | ||||
# Add dangling symlinks, ignore disappeared files | ||||
if st.S_ISLNK(stat(name, dir_fd=topfd, follow_symlinks=False) | ||||
.st_mode): | ||||
nondirs.append(name) | ||||
except FileNotFoundError: | ||||
continue | ||||
if topdown: | ||||
yield toppath, dirs, nondirs, topfd | ||||
for name in dirs: | ||||
try: | ||||
orig_st = stat(name, dir_fd=topfd, follow_symlinks=follow_symlinks) | ||||
dirfd = open(name, O_RDONLY, dir_fd=topfd) | ||||
except error as err: | ||||
if onerror is not None: | ||||
onerror(err) | ||||
return | ||||
try: | ||||
if follow_symlinks or path.samestat(orig_st, stat(dirfd)): | ||||
dirpath = path.join(toppath, name) | ||||
yield from _fwalk(dirfd, dirpath, topdown, onerror, follow_symlinks) | ||||
finally: | ||||
close(dirfd) | ||||
if not topdown: | ||||
yield toppath, dirs, nondirs, topfd | ||||
__all__.append("fwalk") | ||||
# Make sure os.environ exists, at least | ||||
try: | ||||
environ | ||||
except NameError: | ||||
environ = {} | ||||
def execl(file, *args): | ||||
"""execl(file, *args) | ||||
Execute the executable file with argument list args, replacing the | ||||
current process. """ | ||||
execv(file, args) | ||||
def execle(file, *args): | ||||
"""execle(file, *args, env) | ||||
Execute the executable file with argument list args and | ||||
environment env, replacing the current process. """ | ||||
env = args[-1] | ||||
execve(file, args[:-1], env) | ||||
def execlp(file, *args): | ||||
"""execlp(file, *args) | ||||
Execute the executable file (which is searched for along $PATH) | ||||
with argument list args, replacing the current process. """ | ||||
execvp(file, args) | ||||
def execlpe(file, *args): | ||||
"""execlpe(file, *args, env) | ||||
Execute the executable file (which is searched for along $PATH) | ||||
with argument list args and environment env, replacing the current | ||||
process. """ | ||||
env = args[-1] | ||||
execvpe(file, args[:-1], env) | ||||
def execvp(file, args): | ||||
"""execvp(file, args) | ||||
Execute the executable file (which is searched for along $PATH) | ||||
with argument list args, replacing the current process. | ||||
args may be a list or tuple of strings. """ | ||||
_execvpe(file, args) | ||||
def execvpe(file, args, env): | ||||
"""execvpe(file, args, env) | ||||
Execute the executable file (which is searched for along $PATH) | ||||
with argument list args and environment env , replacing the | ||||
current process. | ||||
args may be a list or tuple of strings. """ | ||||
_execvpe(file, args, env) | ||||
__all__.extend(["execl","execle","execlp","execlpe","execvp","execvpe"]) | ||||
def _execvpe(file, args, env=None): | ||||
if env is not None: | ||||
exec_func = execve | ||||
argrest = (args, env) | ||||
else: | ||||
exec_func = execv | ||||
argrest = (args,) | ||||
env = environ | ||||
head, tail = path.split(file) | ||||
if head: | ||||
exec_func(file, *argrest) | ||||
return | ||||
last_exc = saved_exc = None | ||||
saved_tb = None | ||||
path_list = get_exec_path(env) | ||||
if name != 'nt': | ||||
file = fsencode(file) | ||||
path_list = map(fsencode, path_list) | ||||
for dir in path_list: | ||||
fullname = path.join(dir, file) | ||||
try: | ||||
exec_func(fullname, *argrest) | ||||
except error as e: | ||||
last_exc = e | ||||
tb = sys.exc_info()[2] | ||||
if (e.errno != errno.ENOENT and e.errno != errno.ENOTDIR | ||||
and saved_exc is None): | ||||
saved_exc = e | ||||
saved_tb = tb | ||||
if saved_exc: | ||||
raise saved_exc.with_traceback(saved_tb) | ||||
raise last_exc.with_traceback(tb) | ||||
def get_exec_path(env=None): | ||||
"""Returns the sequence of directories that will be searched for the | ||||
named executable (similar to a shell) when launching a process. | ||||
*env* must be an environment variable dict or None. If *env* is None, | ||||
os.environ will be used. | ||||
""" | ||||
# Use a local import instead of a global import to limit the number of | ||||
# modules loaded at startup: the os module is always loaded at startup by | ||||
# Python. It may also avoid a bootstrap issue. | ||||
import warnings | ||||
if env is None: | ||||
env = environ | ||||
# {b'PATH': ...}.get('PATH') and {'PATH': ...}.get(b'PATH') emit a | ||||
# BytesWarning when using python -b or python -bb: ignore the warning | ||||
with warnings.catch_warnings(): | ||||
warnings.simplefilter("ignore", BytesWarning) | ||||
try: | ||||
path_list = env.get('PATH') | ||||
except TypeError: | ||||
path_list = None | ||||
if supports_bytes_environ: | ||||
try: | ||||
path_listb = env[b'PATH'] | ||||
except (KeyError, TypeError): | ||||
pass | ||||
else: | ||||
if path_list is not None: | ||||
raise ValueError( | ||||
"env cannot contain 'PATH' and b'PATH' keys") | ||||
path_list = path_listb | ||||
if path_list is not None and isinstance(path_list, bytes): | ||||
path_list = fsdecode(path_list) | ||||
if path_list is None: | ||||
path_list = defpath | ||||
return path_list.split(pathsep) | ||||
# Change environ to automatically call putenv(), unsetenv if they exist. | ||||
from collections.abc import MutableMapping | ||||
class _Environ(MutableMapping): | ||||
def __init__(self, data, encodekey, decodekey, encodevalue, decodevalue, putenv, unsetenv): | ||||
self.encodekey = encodekey | ||||
self.decodekey = decodekey | ||||
self.encodevalue = encodevalue | ||||
self.decodevalue = decodevalue | ||||
self.putenv = putenv | ||||
self.unsetenv = unsetenv | ||||
self._data = data | ||||
def __getitem__(self, key): | ||||
try: | ||||
value = self._data[self.encodekey(key)] | ||||
except KeyError: | ||||
# raise KeyError with the original key value | ||||
raise KeyError(key) from None | ||||
return self.decodevalue(value) | ||||
def __setitem__(self, key, value): | ||||
key = self.encodekey(key) | ||||
value = self.encodevalue(value) | ||||
self.putenv(key, value) | ||||
self._data[key] = value | ||||
def __delitem__(self, key): | ||||
encodedkey = self.encodekey(key) | ||||
self.unsetenv(encodedkey) | ||||
try: | ||||
del self._data[encodedkey] | ||||
except KeyError: | ||||
# raise KeyError with the original key value | ||||
raise KeyError(key) from None | ||||
def __iter__(self): | ||||
for key in self._data: | ||||
yield self.decodekey(key) | ||||
def __len__(self): | ||||
return len(self._data) | ||||
def __repr__(self): | ||||
return 'environ({{{}}})'.format(', '.join( | ||||
('{!r}: {!r}'.format(self.decodekey(key), self.decodevalue(value)) | ||||
for key, value in self._data.items()))) | ||||
def copy(self): | ||||
return dict(self) | ||||
def setdefault(self, key, value): | ||||
if key not in self: | ||||
self[key] = value | ||||
return self[key] | ||||
try: | ||||
_putenv = putenv | ||||
except NameError: | ||||
_putenv = lambda key, value: None | ||||
else: | ||||
__all__.append("putenv") | ||||
try: | ||||
_unsetenv = unsetenv | ||||
except NameError: | ||||
_unsetenv = lambda key: _putenv(key, "") | ||||
else: | ||||
__all__.append("unsetenv") | ||||
def _createenviron(): | ||||
if name in ('os2', 'nt'): | ||||
# Where Env Var Names Must Be UPPERCASE | ||||
def check_str(value): | ||||
if not isinstance(value, str): | ||||
raise TypeError("str expected, not %s" % type(value).__name__) | ||||
return value | ||||
encode = check_str | ||||
decode = str | ||||
def encodekey(key): | ||||
return encode(key).upper() | ||||
data = {} | ||||
for key, value in environ.items(): | ||||
data[encodekey(key)] = value | ||||
else: | ||||
# Where Env Var Names Can Be Mixed Case | ||||
encoding = sys.getfilesystemencoding() | ||||
def encode(value): | ||||
if not isinstance(value, str): | ||||
raise TypeError("str expected, not %s" % type(value).__name__) | ||||
return value.encode(encoding, 'surrogateescape') | ||||
def decode(value): | ||||
return value.decode(encoding, 'surrogateescape') | ||||
encodekey = encode | ||||
data = environ | ||||
return _Environ(data, | ||||
encodekey, decode, | ||||
encode, decode, | ||||
_putenv, _unsetenv) | ||||
# unicode environ | ||||
environ = _createenviron() | ||||
del _createenviron | ||||
def getenv(key, default=None): | ||||
"""Get an environment variable, return None if it doesn't exist. | ||||
The optional second argument can specify an alternate default. | ||||
key, default and the result are str.""" | ||||
return environ.get(key, default) | ||||
supports_bytes_environ = name not in ('os2', 'nt') | ||||
__all__.extend(("getenv", "supports_bytes_environ")) | ||||
if supports_bytes_environ: | ||||
def _check_bytes(value): | ||||
if not isinstance(value, bytes): | ||||
raise TypeError("bytes expected, not %s" % type(value).__name__) | ||||
return value | ||||
# bytes environ | ||||
environb = _Environ(environ._data, | ||||
_check_bytes, bytes, | ||||
_check_bytes, bytes, | ||||
_putenv, _unsetenv) | ||||
del _check_bytes | ||||
def getenvb(key, default=None): | ||||
"""Get an environment variable, return None if it doesn't exist. | ||||
The optional second argument can specify an alternate default. | ||||
key, default and the result are bytes.""" | ||||
return environb.get(key, default) | ||||
__all__.extend(("environb", "getenvb")) | ||||
def _fscodec(): | ||||
encoding = sys.getfilesystemencoding() | ||||
if encoding == 'mbcs': | ||||
errors = 'strict' | ||||
else: | ||||
errors = 'surrogateescape' | ||||
def fsencode(filename): | ||||
""" | ||||
Encode filename to the filesystem encoding with 'surrogateescape' error | ||||
handler, return bytes unchanged. On Windows, use 'strict' error handler if | ||||
the file system encoding is 'mbcs' (which is the default encoding). | ||||
""" | ||||
if isinstance(filename, bytes): | ||||
return filename | ||||
elif isinstance(filename, str): | ||||
return filename.encode(encoding, errors) | ||||
else: | ||||
raise TypeError("expect bytes or str, not %s" % type(filename).__name__) | ||||
def fsdecode(filename): | ||||
""" | ||||
Decode filename from the filesystem encoding with 'surrogateescape' error | ||||
handler, return str unchanged. On Windows, use 'strict' error handler if | ||||
the file system encoding is 'mbcs' (which is the default encoding). | ||||
""" | ||||
if isinstance(filename, str): | ||||
return filename | ||||
elif isinstance(filename, bytes): | ||||
return filename.decode(encoding, errors) | ||||
else: | ||||
raise TypeError("expect bytes or str, not %s" % type(filename).__name__) | ||||
return fsencode, fsdecode | ||||
fsencode, fsdecode = _fscodec() | ||||
del _fscodec | ||||
# Supply spawn*() (probably only for Unix) | ||||
if _exists("fork") and not _exists("spawnv") and _exists("execv"): | ||||
P_WAIT = 0 | ||||
P_NOWAIT = P_NOWAITO = 1 | ||||
__all__.extend(["P_WAIT", "P_NOWAIT", "P_NOWAITO"]) | ||||
# XXX Should we support P_DETACH? I suppose it could fork()**2 | ||||
# and close the std I/O streams. Also, P_OVERLAY is the same | ||||
# as execv*()? | ||||
def _spawnvef(mode, file, args, env, func): | ||||
# Internal helper; func is the exec*() function to use | ||||
pid = fork() | ||||
if not pid: | ||||
# Child | ||||
try: | ||||
if env is None: | ||||
func(file, args) | ||||
else: | ||||
func(file, args, env) | ||||
except: | ||||
_exit(127) | ||||
else: | ||||
# Parent | ||||
if mode == P_NOWAIT: | ||||
return pid # Caller is responsible for waiting! | ||||
while 1: | ||||
wpid, sts = waitpid(pid, 0) | ||||
if WIFSTOPPED(sts): | ||||
continue | ||||
elif WIFSIGNALED(sts): | ||||
return -WTERMSIG(sts) | ||||
elif WIFEXITED(sts): | ||||
return WEXITSTATUS(sts) | ||||
else: | ||||
raise error("Not stopped, signaled or exited???") | ||||
def spawnv(mode, file, args): | ||||
"""spawnv(mode, file, args) -> integer | ||||
Execute file with arguments from args in a subprocess. | ||||
If mode == P_NOWAIT return the pid of the process. | ||||
If mode == P_WAIT return the process's exit code if it exits normally; | ||||
otherwise return -SIG, where SIG is the signal that killed it. """ | ||||
return _spawnvef(mode, file, args, None, execv) | ||||
def spawnve(mode, file, args, env): | ||||
"""spawnve(mode, file, args, env) -> integer | ||||
Execute file with arguments from args in a subprocess with the | ||||
specified environment. | ||||
If mode == P_NOWAIT return the pid of the process. | ||||
If mode == P_WAIT return the process's exit code if it exits normally; | ||||
otherwise return -SIG, where SIG is the signal that killed it. """ | ||||
return _spawnvef(mode, file, args, env, execve) | ||||
# Note: spawnvp[e] is't currently supported on Windows | ||||
def spawnvp(mode, file, args): | ||||
"""spawnvp(mode, file, args) -> integer | ||||
Execute file (which is looked for along $PATH) with arguments from | ||||
args in a subprocess. | ||||
If mode == P_NOWAIT return the pid of the process. | ||||
If mode == P_WAIT return the process's exit code if it exits normally; | ||||
otherwise return -SIG, where SIG is the signal that killed it. """ | ||||
return _spawnvef(mode, file, args, None, execvp) | ||||
def spawnvpe(mode, file, args, env): | ||||
"""spawnvpe(mode, file, args, env) -> integer | ||||
Execute file (which is looked for along $PATH) with arguments from | ||||
args in a subprocess with the supplied environment. | ||||
If mode == P_NOWAIT return the pid of the process. | ||||
If mode == P_WAIT return the process's exit code if it exits normally; | ||||
otherwise return -SIG, where SIG is the signal that killed it. """ | ||||
return _spawnvef(mode, file, args, env, execvpe) | ||||
if _exists("spawnv"): | ||||
# These aren't supplied by the basic Windows code | ||||
# but can be easily implemented in Python | ||||
def spawnl(mode, file, *args): | ||||
"""spawnl(mode, file, *args) -> integer | ||||
Execute file with arguments from args in a subprocess. | ||||
If mode == P_NOWAIT return the pid of the process. | ||||
If mode == P_WAIT return the process's exit code if it exits normally; | ||||
otherwise return -SIG, where SIG is the signal that killed it. """ | ||||
return spawnv(mode, file, args) | ||||
def spawnle(mode, file, *args): | ||||
"""spawnle(mode, file, *args, env) -> integer | ||||
Execute file with arguments from args in a subprocess with the | ||||
supplied environment. | ||||
If mode == P_NOWAIT return the pid of the process. | ||||
If mode == P_WAIT return the process's exit code if it exits normally; | ||||
otherwise return -SIG, where SIG is the signal that killed it. """ | ||||
env = args[-1] | ||||
return spawnve(mode, file, args[:-1], env) | ||||
__all__.extend(["spawnv", "spawnve", "spawnl", "spawnle",]) | ||||
if _exists("spawnvp"): | ||||
# At the moment, Windows doesn't implement spawnvp[e], | ||||
# so it won't have spawnlp[e] either. | ||||
def spawnlp(mode, file, *args): | ||||
"""spawnlp(mode, file, *args) -> integer | ||||
Execute file (which is looked for along $PATH) with arguments from | ||||
args in a subprocess with the supplied environment. | ||||
If mode == P_NOWAIT return the pid of the process. | ||||
If mode == P_WAIT return the process's exit code if it exits normally; | ||||
otherwise return -SIG, where SIG is the signal that killed it. """ | ||||
return spawnvp(mode, file, args) | ||||
def spawnlpe(mode, file, *args): | ||||
"""spawnlpe(mode, file, *args, env) -> integer | ||||
Execute file (which is looked for along $PATH) with arguments from | ||||
args in a subprocess with the supplied environment. | ||||
If mode == P_NOWAIT return the pid of the process. | ||||
If mode == P_WAIT return the process's exit code if it exits normally; | ||||
otherwise return -SIG, where SIG is the signal that killed it. """ | ||||
env = args[-1] | ||||
return spawnvpe(mode, file, args[:-1], env) | ||||
__all__.extend(["spawnvp", "spawnvpe", "spawnlp", "spawnlpe",]) | ||||
import copyreg as _copyreg | ||||
def _make_stat_result(tup, dict): | ||||
return stat_result(tup, dict) | ||||
def _pickle_stat_result(sr): | ||||
(type, args) = sr.__reduce__() | ||||
return (_make_stat_result, args) | ||||
try: | ||||
_copyreg.pickle(stat_result, _pickle_stat_result, _make_stat_result) | ||||
except NameError: # stat_result may not exist | ||||
pass | ||||
def _make_statvfs_result(tup, dict): | ||||
return statvfs_result(tup, dict) | ||||
def _pickle_statvfs_result(sr): | ||||
(type, args) = sr.__reduce__() | ||||
return (_make_statvfs_result, args) | ||||
try: | ||||
_copyreg.pickle(statvfs_result, _pickle_statvfs_result, | ||||
_make_statvfs_result) | ||||
except NameError: # statvfs_result may not exist | ||||
pass | ||||
# Supply os.popen() | ||||
def popen(cmd, mode="r", buffering=-1): | ||||
if not isinstance(cmd, str): | ||||
raise TypeError("invalid cmd type (%s, expected string)" % type(cmd)) | ||||
if mode not in ("r", "w"): | ||||
raise ValueError("invalid mode %r" % mode) | ||||
if buffering == 0 or buffering is None: | ||||
raise ValueError("popen() does not support unbuffered streams") | ||||
import subprocess, io | ||||
if mode == "r": | ||||
proc = subprocess.Popen(cmd, | ||||
shell=True, | ||||
stdout=subprocess.PIPE, | ||||
bufsize=buffering) | ||||
return _wrap_close(io.TextIOWrapper(proc.stdout), proc) | ||||
else: | ||||
proc = subprocess.Popen(cmd, | ||||
shell=True, | ||||
stdin=subprocess.PIPE, | ||||
bufsize=buffering) | ||||
return _wrap_close(io.TextIOWrapper(proc.stdin), proc) | ||||
# Helper for popen() -- a proxy for a file whose close waits for the process | ||||
class _wrap_close: | ||||
def __init__(self, stream, proc): | ||||
self._stream = stream | ||||
self._proc = proc | ||||
def close(self): | ||||
self._stream.close() | ||||
returncode = self._proc.wait() | ||||
if returncode == 0: | ||||
return None | ||||
if name == 'nt': | ||||
return returncode | ||||
else: | ||||
return returncode << 8 # Shift left to match old behavior | ||||
def __enter__(self): | ||||
return self | ||||
def __exit__(self, *args): | ||||
self.close() | ||||
def __getattr__(self, name): | ||||
return getattr(self._stream, name) | ||||
def __iter__(self): | ||||
return iter(self._stream) | ||||
# Supply os.fdopen() | ||||
def fdopen(fd, *args, **kwargs): | ||||
if not isinstance(fd, int): | ||||
raise TypeError("invalid fd type (%s, expected integer)" % type(fd)) | ||||
import io | ||||
return io.open(fd, *args, **kwargs) | ||||