Show More
Commit Description:
Merge pull request #17 from nattee/master...
Commit Description:
Merge pull request #17 from nattee/master
upgrade to current working snapshot
References:
File last commit:
Show/Diff file:
Action:
lib/assets/Lib/warnings.py
| 393 lines
| 13.9 KiB
| text/x-python
| PythonLexer
|
r584 | """Python part of the warnings subsystem.""" | |||
# Note: function level imports should *not* be used | ||||
# in this module as it may cause import lock deadlock. | ||||
# See bug 683658. | ||||
import linecache | ||||
import sys | ||||
__all__ = ["warn", "showwarning", "formatwarning", "filterwarnings", | ||||
"resetwarnings", "catch_warnings"] | ||||
def showwarning(message, category, filename, lineno, file=None, line=None): | ||||
"""Hook to write a warning to a file; replace if you like.""" | ||||
if file is None: | ||||
file = sys.stderr | ||||
try: | ||||
file.write(formatwarning(message, category, filename, lineno, line)) | ||||
except IOError: | ||||
pass # the file (probably stderr) is invalid - this warning gets lost. | ||||
def formatwarning(message, category, filename, lineno, line=None): | ||||
"""Function to format a warning the standard way.""" | ||||
s = "%s:%s: %s: %s\n" % (filename, lineno, category.__name__, message) | ||||
line = linecache.getline(filename, lineno) if line is None else line | ||||
if line: | ||||
line = line.strip() | ||||
s += " %s\n" % line | ||||
return s | ||||
def filterwarnings(action, message="", category=Warning, module="", lineno=0, | ||||
append=False): | ||||
"""Insert an entry into the list of warnings filters (at the front). | ||||
'action' -- one of "error", "ignore", "always", "default", "module", | ||||
or "once" | ||||
'message' -- a regex that the warning message must match | ||||
'category' -- a class that the warning must be a subclass of | ||||
'module' -- a regex that the module name must match | ||||
'lineno' -- an integer line number, 0 matches all warnings | ||||
'append' -- if true, append to the list of filters | ||||
""" | ||||
import re | ||||
assert action in ("error", "ignore", "always", "default", "module", | ||||
"once"), "invalid action: %r" % (action,) | ||||
assert isinstance(message, str), "message must be a string" | ||||
assert isinstance(category, type), "category must be a class" | ||||
assert issubclass(category, Warning), "category must be a Warning subclass" | ||||
assert isinstance(module, str), "module must be a string" | ||||
assert isinstance(lineno, int) and lineno >= 0, \ | ||||
"lineno must be an int >= 0" | ||||
item = (action, re.compile(message, re.I), category, | ||||
re.compile(module), lineno) | ||||
if append: | ||||
filters.append(item) | ||||
else: | ||||
filters.insert(0, item) | ||||
def simplefilter(action, category=Warning, lineno=0, append=False): | ||||
"""Insert a simple entry into the list of warnings filters (at the front). | ||||
A simple filter matches all modules and messages. | ||||
'action' -- one of "error", "ignore", "always", "default", "module", | ||||
or "once" | ||||
'category' -- a class that the warning must be a subclass of | ||||
'lineno' -- an integer line number, 0 matches all warnings | ||||
'append' -- if true, append to the list of filters | ||||
""" | ||||
assert action in ("error", "ignore", "always", "default", "module", | ||||
"once"), "invalid action: %r" % (action,) | ||||
assert isinstance(lineno, int) and lineno >= 0, \ | ||||
"lineno must be an int >= 0" | ||||
item = (action, None, category, None, lineno) | ||||
if append: | ||||
filters.append(item) | ||||
else: | ||||
filters.insert(0, item) | ||||
def resetwarnings(): | ||||
"""Clear the list of warning filters, so that no filters are active.""" | ||||
filters[:] = [] | ||||
class _OptionError(Exception): | ||||
"""Exception used by option processing helpers.""" | ||||
pass | ||||
# Helper to process -W options passed via sys.warnoptions | ||||
def _processoptions(args): | ||||
for arg in args: | ||||
try: | ||||
_setoption(arg) | ||||
except _OptionError as msg: | ||||
print("Invalid -W option ignored:", msg, file=sys.stderr) | ||||
# Helper for _processoptions() | ||||
def _setoption(arg): | ||||
import re | ||||
parts = arg.split(':') | ||||
if len(parts) > 5: | ||||
raise _OptionError("too many fields (max 5): %r" % (arg,)) | ||||
while len(parts) < 5: | ||||
parts.append('') | ||||
action, message, category, module, lineno = [s.strip() | ||||
for s in parts] | ||||
action = _getaction(action) | ||||
message = re.escape(message) | ||||
category = _getcategory(category) | ||||
module = re.escape(module) | ||||
if module: | ||||
module = module + '$' | ||||
if lineno: | ||||
try: | ||||
lineno = int(lineno) | ||||
if lineno < 0: | ||||
raise ValueError | ||||
except (ValueError, OverflowError): | ||||
raise _OptionError("invalid lineno %r" % (lineno,)) | ||||
else: | ||||
lineno = 0 | ||||
filterwarnings(action, message, category, module, lineno) | ||||
# Helper for _setoption() | ||||
def _getaction(action): | ||||
if not action: | ||||
return "default" | ||||
if action == "all": return "always" # Alias | ||||
for a in ('default', 'always', 'ignore', 'module', 'once', 'error'): | ||||
if a.startswith(action): | ||||
return a | ||||
raise _OptionError("invalid action: %r" % (action,)) | ||||
# Helper for _setoption() | ||||
def _getcategory(category): | ||||
import re | ||||
if not category: | ||||
return Warning | ||||
if re.match("^[a-zA-Z0-9_]+$", category): | ||||
try: | ||||
cat = eval(category) | ||||
except NameError: | ||||
raise _OptionError("unknown warning category: %r" % (category,)) | ||||
else: | ||||
i = category.rfind(".") | ||||
module = category[:i] | ||||
klass = category[i+1:] | ||||
try: | ||||
m = __import__(module, None, None, [klass]) | ||||
except ImportError: | ||||
raise _OptionError("invalid module name: %r" % (module,)) | ||||
try: | ||||
cat = getattr(m, klass) | ||||
except AttributeError: | ||||
raise _OptionError("unknown warning category: %r" % (category,)) | ||||
if not issubclass(cat, Warning): | ||||
raise _OptionError("invalid warning category: %r" % (category,)) | ||||
return cat | ||||
# Code typically replaced by _warnings | ||||
def warn(message, category=None, stacklevel=1): | ||||
"""Issue a warning, or maybe ignore it or raise an exception.""" | ||||
# Check if message is already a Warning object | ||||
if isinstance(message, Warning): | ||||
category = message.__class__ | ||||
# Check category argument | ||||
if category is None: | ||||
category = UserWarning | ||||
assert issubclass(category, Warning) | ||||
# Get context information | ||||
try: | ||||
caller = sys._getframe(stacklevel) | ||||
except ValueError: | ||||
globals = sys.__dict__ | ||||
lineno = 1 | ||||
else: | ||||
globals = caller.f_globals | ||||
lineno = caller.f_lineno | ||||
if '__name__' in globals: | ||||
module = globals['__name__'] | ||||
else: | ||||
module = "<string>" | ||||
filename = globals.get('__file__') | ||||
if filename: | ||||
fnl = filename.lower() | ||||
if fnl.endswith((".pyc", ".pyo")): | ||||
filename = filename[:-1] | ||||
else: | ||||
if module == "__main__": | ||||
try: | ||||
filename = sys.argv[0] | ||||
except AttributeError: | ||||
# embedded interpreters don't have sys.argv, see bug #839151 | ||||
filename = '__main__' | ||||
if not filename: | ||||
filename = module | ||||
registry = globals.setdefault("__warningregistry__", {}) | ||||
warn_explicit(message, category, filename, lineno, module, registry, | ||||
globals) | ||||
def warn_explicit(message, category, filename, lineno, | ||||
module=None, registry=None, module_globals=None): | ||||
lineno = int(lineno) | ||||
if module is None: | ||||
module = filename or "<unknown>" | ||||
if module[-3:].lower() == ".py": | ||||
module = module[:-3] # XXX What about leading pathname? | ||||
if registry is None: | ||||
registry = {} | ||||
if isinstance(message, Warning): | ||||
text = str(message) | ||||
category = message.__class__ | ||||
else: | ||||
text = message | ||||
message = category(message) | ||||
key = (text, category, lineno) | ||||
# Quick test for common case | ||||
if registry.get(key): | ||||
return | ||||
# Search the filters | ||||
for item in filters: | ||||
action, msg, cat, mod, ln = item | ||||
if ((msg is None or msg.match(text)) and | ||||
issubclass(category, cat) and | ||||
(mod is None or mod.match(module)) and | ||||
(ln == 0 or lineno == ln)): | ||||
break | ||||
else: | ||||
action = defaultaction | ||||
# Early exit actions | ||||
if action == "ignore": | ||||
registry[key] = 1 | ||||
return | ||||
# Prime the linecache for formatting, in case the | ||||
# "file" is actually in a zipfile or something. | ||||
linecache.getlines(filename, module_globals) | ||||
if action == "error": | ||||
raise message | ||||
# Other actions | ||||
if action == "once": | ||||
registry[key] = 1 | ||||
oncekey = (text, category) | ||||
if onceregistry.get(oncekey): | ||||
return | ||||
onceregistry[oncekey] = 1 | ||||
elif action == "always": | ||||
pass | ||||
elif action == "module": | ||||
registry[key] = 1 | ||||
altkey = (text, category, 0) | ||||
if registry.get(altkey): | ||||
return | ||||
registry[altkey] = 1 | ||||
elif action == "default": | ||||
registry[key] = 1 | ||||
else: | ||||
# Unrecognized actions are errors | ||||
raise RuntimeError( | ||||
"Unrecognized action (%r) in warnings.filters:\n %s" % | ||||
(action, item)) | ||||
if not callable(showwarning): | ||||
raise TypeError("warnings.showwarning() must be set to a " | ||||
"function or method") | ||||
# Print message and context | ||||
showwarning(message, category, filename, lineno) | ||||
class WarningMessage(object): | ||||
"""Holds the result of a single showwarning() call.""" | ||||
_WARNING_DETAILS = ("message", "category", "filename", "lineno", "file", | ||||
"line") | ||||
def __init__(self, message, category, filename, lineno, file=None, | ||||
line=None): | ||||
local_values = locals() | ||||
for attr in self._WARNING_DETAILS: | ||||
setattr(self, attr, local_values[attr]) | ||||
self._category_name = category.__name__ if category else None | ||||
def __str__(self): | ||||
return ("{message : %r, category : %r, filename : %r, lineno : %s, " | ||||
"line : %r}" % (self.message, self._category_name, | ||||
self.filename, self.lineno, self.line)) | ||||
class catch_warnings(object): | ||||
"""A context manager that copies and restores the warnings filter upon | ||||
exiting the context. | ||||
The 'record' argument specifies whether warnings should be captured by a | ||||
custom implementation of warnings.showwarning() and be appended to a list | ||||
returned by the context manager. Otherwise None is returned by the context | ||||
manager. The objects appended to the list are arguments whose attributes | ||||
mirror the arguments to showwarning(). | ||||
The 'module' argument is to specify an alternative module to the module | ||||
named 'warnings' and imported under that name. This argument is only useful | ||||
when testing the warnings module itself. | ||||
""" | ||||
def __init__(self, *, record=False, module=None): | ||||
"""Specify whether to record warnings and if an alternative module | ||||
should be used other than sys.modules['warnings']. | ||||
For compatibility with Python 3.0, please consider all arguments to be | ||||
keyword-only. | ||||
""" | ||||
self._record = record | ||||
self._module = sys.modules['warnings'] if module is None else module | ||||
self._entered = False | ||||
def __repr__(self): | ||||
args = [] | ||||
if self._record: | ||||
args.append("record=True") | ||||
if self._module is not sys.modules['warnings']: | ||||
args.append("module=%r" % self._module) | ||||
name = type(self).__name__ | ||||
return "%s(%s)" % (name, ", ".join(args)) | ||||
def __enter__(self): | ||||
if self._entered: | ||||
raise RuntimeError("Cannot enter %r twice" % self) | ||||
self._entered = True | ||||
self._filters = self._module.filters | ||||
self._module.filters = self._filters[:] | ||||
self._showwarning = self._module.showwarning | ||||
if self._record: | ||||
log = [] | ||||
def showwarning(*args, **kwargs): | ||||
log.append(WarningMessage(*args, **kwargs)) | ||||
self._module.showwarning = showwarning | ||||
return log | ||||
else: | ||||
return None | ||||
def __exit__(self, *exc_info): | ||||
if not self._entered: | ||||
raise RuntimeError("Cannot exit %r without entering first" % self) | ||||
self._module.filters = self._filters | ||||
self._module.showwarning = self._showwarning | ||||
# filters contains a sequence of filter 5-tuples | ||||
# The components of the 5-tuple are: | ||||
# - an action: error, ignore, always, default, module, or once | ||||
# - a compiled regex that must match the warning message | ||||
# - a class representing the warning category | ||||
# - a compiled regex that must match the module that is being warned | ||||
# - a line number for the line being warning, or 0 to mean any line | ||||
# If either if the compiled regexs are None, match anything. | ||||
_warnings_defaults = False | ||||
try: | ||||
from _warnings import (filters, _defaultaction, _onceregistry, | ||||
warn, warn_explicit) | ||||
defaultaction = _defaultaction | ||||
onceregistry = _onceregistry | ||||
_warnings_defaults = True | ||||
except ImportError: | ||||
filters = [] | ||||
defaultaction = "default" | ||||
onceregistry = {} | ||||
# Module initialization | ||||
_processoptions(sys.warnoptions) | ||||
if not _warnings_defaults: | ||||
silence = [ImportWarning, PendingDeprecationWarning] | ||||
silence.append(DeprecationWarning) | ||||
for cls in silence: | ||||
simplefilter("ignore", category=cls) | ||||
bytes_warning = sys.flags.bytes_warning | ||||
if bytes_warning > 1: | ||||
bytes_action = "error" | ||||
elif bytes_warning: | ||||
bytes_action = "default" | ||||
else: | ||||
bytes_action = "ignore" | ||||
simplefilter(bytes_action, category=BytesWarning, append=1) | ||||
# resource usage warnings are enabled by default in pydebug mode | ||||
if hasattr(sys, 'gettotalrefcount'): | ||||
resource_action = "always" | ||||
else: | ||||
resource_action = "ignore" | ||||
simplefilter(resource_action, category=ResourceWarning, append=1) | ||||
del _warnings_defaults | ||||