| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987 |
- """Supporting definitions for the Python regression tests."""
- if __name__ != 'test.support':
- raise ImportError('support must be imported from the test package')
- import collections.abc
- import contextlib
- import datetime
- import errno
- import faulthandler
- import fnmatch
- import functools
- import gc
- import importlib
- import importlib.util
- import io
- import logging.handlers
- import nntplib
- import os
- import platform
- import re
- import shutil
- import socket
- import stat
- import struct
- import subprocess
- import sys
- import sysconfig
- import tempfile
- import _thread
- import threading
- import time
- import types
- import unittest
- import urllib.error
- import warnings
- from .testresult import get_test_runner
- try:
- import multiprocessing.process
- except ImportError:
- multiprocessing = None
- try:
- import zlib
- except ImportError:
- zlib = None
- try:
- import gzip
- except ImportError:
- gzip = None
- try:
- import bz2
- except ImportError:
- bz2 = None
- try:
- import lzma
- except ImportError:
- lzma = None
- try:
- import resource
- except ImportError:
- resource = None
- __all__ = [
- # globals
- "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast",
- # exceptions
- "Error", "TestFailed", "TestDidNotRun", "ResourceDenied",
- # imports
- "import_module", "import_fresh_module", "CleanImport",
- # modules
- "unload", "forget",
- # io
- "record_original_stdout", "get_original_stdout", "captured_stdout",
- "captured_stdin", "captured_stderr",
- # filesystem
- "TESTFN", "SAVEDCWD", "unlink", "rmtree", "temp_cwd", "findfile",
- "create_empty_file", "can_symlink", "fs_is_case_insensitive",
- # unittest
- "is_resource_enabled", "requires", "requires_freebsd_version",
- "requires_linux_version", "requires_mac_ver", "check_syntax_error",
- "TransientResource", "time_out", "socket_peer_reset", "ioerror_peer_reset",
- "transient_internet", "BasicTestRunner", "run_unittest", "run_doctest",
- "skip_unless_symlink", "requires_gzip", "requires_bz2", "requires_lzma",
- "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
- "requires_IEEE_754", "skip_unless_xattr", "requires_zlib",
- "anticipate_failure", "load_package_tests", "detect_api_mismatch",
- "check__all__", "skip_unless_bind_unix_socket",
- # sys
- "is_jython", "is_android", "check_impl_detail", "unix_shell",
- "setswitchinterval",
- # network
- "HOST", "IPV6_ENABLED", "find_unused_port", "bind_port", "open_urlresource",
- "bind_unix_socket",
- # processes
- 'temp_umask', "reap_children",
- # logging
- "TestHandler",
- # threads
- "threading_setup", "threading_cleanup", "reap_threads", "start_threads",
- # miscellaneous
- "check_warnings", "check_no_resource_warning", "EnvironmentVarGuard",
- "run_with_locale", "swap_item",
- "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
- "run_with_tz", "PGO", "missing_compiler_executable", "fd_count",
- "ALWAYS_EQ", "LARGEST", "SMALLEST"
- ]
- class Error(Exception):
- """Base class for regression test exceptions."""
- class TestFailed(Error):
- """Test failed."""
- class TestDidNotRun(Error):
- """Test did not run any subtests."""
- class ResourceDenied(unittest.SkipTest):
- """Test skipped because it requested a disallowed resource.
- This is raised when a test calls requires() for a resource that
- has not be enabled. It is used to distinguish between expected
- and unexpected skips.
- """
- @contextlib.contextmanager
- def _ignore_deprecated_imports(ignore=True):
- """Context manager to suppress package and module deprecation
- warnings when importing them.
- If ignore is False, this context manager has no effect.
- """
- if ignore:
- with warnings.catch_warnings():
- warnings.filterwarnings("ignore", ".+ (module|package)",
- DeprecationWarning)
- yield
- else:
- yield
- def import_module(name, deprecated=False, *, required_on=()):
- """Import and return the module to be tested, raising SkipTest if
- it is not available.
- If deprecated is True, any module or package deprecation messages
- will be suppressed. If a module is required on a platform but optional for
- others, set required_on to an iterable of platform prefixes which will be
- compared against sys.platform.
- """
- with _ignore_deprecated_imports(deprecated):
- try:
- return importlib.import_module(name)
- except ImportError as msg:
- if sys.platform.startswith(tuple(required_on)):
- raise
- raise unittest.SkipTest(str(msg))
- def _save_and_remove_module(name, orig_modules):
- """Helper function to save and remove a module from sys.modules
- Raise ImportError if the module can't be imported.
- """
- # try to import the module and raise an error if it can't be imported
- if name not in sys.modules:
- __import__(name)
- del sys.modules[name]
- for modname in list(sys.modules):
- if modname == name or modname.startswith(name + '.'):
- orig_modules[modname] = sys.modules[modname]
- del sys.modules[modname]
- def _save_and_block_module(name, orig_modules):
- """Helper function to save and block a module in sys.modules
- Return True if the module was in sys.modules, False otherwise.
- """
- saved = True
- try:
- orig_modules[name] = sys.modules[name]
- except KeyError:
- saved = False
- sys.modules[name] = None
- return saved
- def anticipate_failure(condition):
- """Decorator to mark a test that is known to be broken in some cases
- Any use of this decorator should have a comment identifying the
- associated tracker issue.
- """
- if condition:
- return unittest.expectedFailure
- return lambda f: f
- def load_package_tests(pkg_dir, loader, standard_tests, pattern):
- """Generic load_tests implementation for simple test packages.
- Most packages can implement load_tests using this function as follows:
- def load_tests(*args):
- return load_package_tests(os.path.dirname(__file__), *args)
- """
- if pattern is None:
- pattern = "test*"
- top_dir = os.path.dirname( # Lib
- os.path.dirname( # test
- os.path.dirname(__file__))) # support
- package_tests = loader.discover(start_dir=pkg_dir,
- top_level_dir=top_dir,
- pattern=pattern)
- standard_tests.addTests(package_tests)
- return standard_tests
- def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
- """Import and return a module, deliberately bypassing sys.modules.
- This function imports and returns a fresh copy of the named Python module
- by removing the named module from sys.modules before doing the import.
- Note that unlike reload, the original module is not affected by
- this operation.
- *fresh* is an iterable of additional module names that are also removed
- from the sys.modules cache before doing the import.
- *blocked* is an iterable of module names that are replaced with None
- in the module cache during the import to ensure that attempts to import
- them raise ImportError.
- The named module and any modules named in the *fresh* and *blocked*
- parameters are saved before starting the import and then reinserted into
- sys.modules when the fresh import is complete.
- Module and package deprecation messages are suppressed during this import
- if *deprecated* is True.
- This function will raise ImportError if the named module cannot be
- imported.
- """
- # NOTE: test_heapq, test_json and test_warnings include extra sanity checks
- # to make sure that this utility function is working as expected
- with _ignore_deprecated_imports(deprecated):
- # Keep track of modules saved for later restoration as well
- # as those which just need a blocking entry removed
- orig_modules = {}
- names_to_remove = []
- _save_and_remove_module(name, orig_modules)
- try:
- for fresh_name in fresh:
- _save_and_remove_module(fresh_name, orig_modules)
- for blocked_name in blocked:
- if not _save_and_block_module(blocked_name, orig_modules):
- names_to_remove.append(blocked_name)
- fresh_module = importlib.import_module(name)
- except ImportError:
- fresh_module = None
- finally:
- for orig_name, module in orig_modules.items():
- sys.modules[orig_name] = module
- for name_to_remove in names_to_remove:
- del sys.modules[name_to_remove]
- return fresh_module
- def get_attribute(obj, name):
- """Get an attribute, raising SkipTest if AttributeError is raised."""
- try:
- attribute = getattr(obj, name)
- except AttributeError:
- raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
- else:
- return attribute
- verbose = 1 # Flag set to 0 by regrtest.py
- use_resources = None # Flag set to [] by regrtest.py
- max_memuse = 0 # Disable bigmem tests (they will still be run with
- # small sizes, to make sure they work.)
- real_max_memuse = 0
- junit_xml_list = None # list of testsuite XML elements
- failfast = False
- # _original_stdout is meant to hold stdout at the time regrtest began.
- # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
- # The point is to have some flavor of stdout the user can actually see.
- _original_stdout = None
- def record_original_stdout(stdout):
- global _original_stdout
- _original_stdout = stdout
- def get_original_stdout():
- return _original_stdout or sys.stdout
- def unload(name):
- try:
- del sys.modules[name]
- except KeyError:
- pass
- def _force_run(path, func, *args):
- try:
- return func(*args)
- except OSError as err:
- if verbose >= 2:
- print('%s: %s' % (err.__class__.__name__, err))
- print('re-run %s%r' % (func.__name__, args))
- os.chmod(path, stat.S_IRWXU)
- return func(*args)
- if sys.platform.startswith("win"):
- def _waitfor(func, pathname, waitall=False):
- # Perform the operation
- func(pathname)
- # Now setup the wait loop
- if waitall:
- dirname = pathname
- else:
- dirname, name = os.path.split(pathname)
- dirname = dirname or '.'
- # Check for `pathname` to be removed from the filesystem.
- # The exponential backoff of the timeout amounts to a total
- # of ~1 second after which the deletion is probably an error
- # anyway.
- # Testing on an i7@4.3GHz shows that usually only 1 iteration is
- # required when contention occurs.
- timeout = 0.001
- while timeout < 1.0:
- # Note we are only testing for the existence of the file(s) in
- # the contents of the directory regardless of any security or
- # access rights. If we have made it this far, we have sufficient
- # permissions to do that much using Python's equivalent of the
- # Windows API FindFirstFile.
- # Other Windows APIs can fail or give incorrect results when
- # dealing with files that are pending deletion.
- L = os.listdir(dirname)
- if not (L if waitall else name in L):
- return
- # Increase the timeout and try again
- time.sleep(timeout)
- timeout *= 2
- warnings.warn('tests may fail, delete still pending for ' + pathname,
- RuntimeWarning, stacklevel=4)
- def _unlink(filename):
- _waitfor(os.unlink, filename)
- def _rmdir(dirname):
- _waitfor(os.rmdir, dirname)
- def _rmtree(path):
- def _rmtree_inner(path):
- for name in _force_run(path, os.listdir, path):
- fullname = os.path.join(path, name)
- try:
- mode = os.lstat(fullname).st_mode
- except OSError as exc:
- print("support.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc),
- file=sys.__stderr__)
- mode = 0
- if stat.S_ISDIR(mode):
- _waitfor(_rmtree_inner, fullname, waitall=True)
- _force_run(fullname, os.rmdir, fullname)
- else:
- _force_run(fullname, os.unlink, fullname)
- _waitfor(_rmtree_inner, path, waitall=True)
- _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
- def _longpath(path):
- try:
- import ctypes
- except ImportError:
- # No ctypes means we can't expands paths.
- pass
- else:
- buffer = ctypes.create_unicode_buffer(len(path) * 2)
- length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
- len(buffer))
- if length:
- return buffer[:length]
- return path
- else:
- _unlink = os.unlink
- _rmdir = os.rmdir
- def _rmtree(path):
- try:
- shutil.rmtree(path)
- return
- except OSError:
- pass
- def _rmtree_inner(path):
- for name in _force_run(path, os.listdir, path):
- fullname = os.path.join(path, name)
- try:
- mode = os.lstat(fullname).st_mode
- except OSError:
- mode = 0
- if stat.S_ISDIR(mode):
- _rmtree_inner(fullname)
- _force_run(path, os.rmdir, fullname)
- else:
- _force_run(path, os.unlink, fullname)
- _rmtree_inner(path)
- os.rmdir(path)
- def _longpath(path):
- return path
- def unlink(filename):
- try:
- _unlink(filename)
- except (FileNotFoundError, NotADirectoryError):
- pass
- def rmdir(dirname):
- try:
- _rmdir(dirname)
- except FileNotFoundError:
- pass
- def rmtree(path):
- try:
- _rmtree(path)
- except FileNotFoundError:
- pass
- def make_legacy_pyc(source):
- """Move a PEP 3147/488 pyc file to its legacy pyc location.
- :param source: The file system path to the source file. The source file
- does not need to exist, however the PEP 3147/488 pyc file must exist.
- :return: The file system path to the legacy pyc file.
- """
- pyc_file = importlib.util.cache_from_source(source)
- up_one = os.path.dirname(os.path.abspath(source))
- legacy_pyc = os.path.join(up_one, source + 'c')
- os.rename(pyc_file, legacy_pyc)
- return legacy_pyc
- def forget(modname):
- """'Forget' a module was ever imported.
- This removes the module from sys.modules and deletes any PEP 3147/488 or
- legacy .pyc files.
- """
- unload(modname)
- for dirname in sys.path:
- source = os.path.join(dirname, modname + '.py')
- # It doesn't matter if they exist or not, unlink all possible
- # combinations of PEP 3147/488 and legacy pyc files.
- unlink(source + 'c')
- for opt in ('', 1, 2):
- unlink(importlib.util.cache_from_source(source, optimization=opt))
- # Check whether a gui is actually available
- def _is_gui_available():
- if hasattr(_is_gui_available, 'result'):
- return _is_gui_available.result
- reason = None
- if sys.platform.startswith('win'):
- # if Python is running as a service (such as the buildbot service),
- # gui interaction may be disallowed
- import ctypes
- import ctypes.wintypes
- UOI_FLAGS = 1
- WSF_VISIBLE = 0x0001
- class USEROBJECTFLAGS(ctypes.Structure):
- _fields_ = [("fInherit", ctypes.wintypes.BOOL),
- ("fReserved", ctypes.wintypes.BOOL),
- ("dwFlags", ctypes.wintypes.DWORD)]
- dll = ctypes.windll.user32
- h = dll.GetProcessWindowStation()
- if not h:
- raise ctypes.WinError()
- uof = USEROBJECTFLAGS()
- needed = ctypes.wintypes.DWORD()
- res = dll.GetUserObjectInformationW(h,
- UOI_FLAGS,
- ctypes.byref(uof),
- ctypes.sizeof(uof),
- ctypes.byref(needed))
- if not res:
- raise ctypes.WinError()
- if not bool(uof.dwFlags & WSF_VISIBLE):
- reason = "gui not available (WSF_VISIBLE flag not set)"
- elif sys.platform == 'darwin':
- # The Aqua Tk implementations on OS X can abort the process if
- # being called in an environment where a window server connection
- # cannot be made, for instance when invoked by a buildbot or ssh
- # process not running under the same user id as the current console
- # user. To avoid that, raise an exception if the window manager
- # connection is not available.
- from ctypes import cdll, c_int, pointer, Structure
- from ctypes.util import find_library
- app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
- if app_services.CGMainDisplayID() == 0:
- reason = "gui tests cannot run without OS X window manager"
- else:
- class ProcessSerialNumber(Structure):
- _fields_ = [("highLongOfPSN", c_int),
- ("lowLongOfPSN", c_int)]
- psn = ProcessSerialNumber()
- psn_p = pointer(psn)
- if ( (app_services.GetCurrentProcess(psn_p) < 0) or
- (app_services.SetFrontProcess(psn_p) < 0) ):
- reason = "cannot run without OS X gui process"
- # check on every platform whether tkinter can actually do anything
- if not reason:
- try:
- from tkinter import Tk
- root = Tk()
- root.withdraw()
- root.update()
- root.destroy()
- except Exception as e:
- err_string = str(e)
- if len(err_string) > 50:
- err_string = err_string[:50] + ' [...]'
- reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
- err_string)
- _is_gui_available.reason = reason
- _is_gui_available.result = not reason
- return _is_gui_available.result
- def is_resource_enabled(resource):
- """Test whether a resource is enabled.
- Known resources are set by regrtest.py. If not running under regrtest.py,
- all resources are assumed enabled unless use_resources has been set.
- """
- return use_resources is None or resource in use_resources
- def requires(resource, msg=None):
- """Raise ResourceDenied if the specified resource is not available."""
- if not is_resource_enabled(resource):
- if msg is None:
- msg = "Use of the %r resource not enabled" % resource
- raise ResourceDenied(msg)
- if resource == 'gui' and not _is_gui_available():
- raise ResourceDenied(_is_gui_available.reason)
- def _requires_unix_version(sysname, min_version):
- """Decorator raising SkipTest if the OS is `sysname` and the version is less
- than `min_version`.
- For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
- the FreeBSD version is less than 7.2.
- """
- import platform
- min_version_txt = '.'.join(map(str, min_version))
- version_txt = platform.release().split('-', 1)[0]
- if platform.system() == sysname:
- try:
- version = tuple(map(int, version_txt.split('.')))
- except ValueError:
- skip = False
- else:
- skip = version < min_version
- else:
- skip = False
- return unittest.skipIf(
- skip,
- f"{sysname} version {min_version_txt} or higher required, not "
- f"{version_txt}"
- )
- def requires_freebsd_version(*min_version):
- """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
- less than `min_version`.
- For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
- version is less than 7.2.
- """
- return _requires_unix_version('FreeBSD', min_version)
- def requires_linux_version(*min_version):
- """Decorator raising SkipTest if the OS is Linux and the Linux version is
- less than `min_version`.
- For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
- version is less than 2.6.32.
- """
- return _requires_unix_version('Linux', min_version)
- def requires_mac_ver(*min_version):
- """Decorator raising SkipTest if the OS is Mac OS X and the OS X
- version if less than min_version.
- For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
- is lesser than 10.5.
- """
- def decorator(func):
- @functools.wraps(func)
- def wrapper(*args, **kw):
- if sys.platform == 'darwin':
- version_txt = platform.mac_ver()[0]
- try:
- version = tuple(map(int, version_txt.split('.')))
- except ValueError:
- pass
- else:
- if version < min_version:
- min_version_txt = '.'.join(map(str, min_version))
- raise unittest.SkipTest(
- "Mac OS X %s or higher required, not %s"
- % (min_version_txt, version_txt))
- return func(*args, **kw)
- wrapper.min_version = min_version
- return wrapper
- return decorator
- HOST = "localhost"
- HOSTv4 = "127.0.0.1"
- HOSTv6 = "::1"
- def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
- """Returns an unused port that should be suitable for binding. This is
- achieved by creating a temporary socket with the same family and type as
- the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
- the specified host address (defaults to 0.0.0.0) with the port set to 0,
- eliciting an unused ephemeral port from the OS. The temporary socket is
- then closed and deleted, and the ephemeral port is returned.
- Either this method or bind_port() should be used for any tests where a
- server socket needs to be bound to a particular port for the duration of
- the test. Which one to use depends on whether the calling code is creating
- a python socket, or if an unused port needs to be provided in a constructor
- or passed to an external program (i.e. the -accept argument to openssl's
- s_server mode). Always prefer bind_port() over find_unused_port() where
- possible. Hard coded ports should *NEVER* be used. As soon as a server
- socket is bound to a hard coded port, the ability to run multiple instances
- of the test simultaneously on the same host is compromised, which makes the
- test a ticking time bomb in a buildbot environment. On Unix buildbots, this
- may simply manifest as a failed test, which can be recovered from without
- intervention in most cases, but on Windows, the entire python process can
- completely and utterly wedge, requiring someone to log in to the buildbot
- and manually kill the affected process.
- (This is easy to reproduce on Windows, unfortunately, and can be traced to
- the SO_REUSEADDR socket option having different semantics on Windows versus
- Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
- listen and then accept connections on identical host/ports. An EADDRINUSE
- OSError will be raised at some point (depending on the platform and
- the order bind and listen were called on each socket).
- However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
- will ever be raised when attempting to bind two identical host/ports. When
- accept() is called on each socket, the second caller's process will steal
- the port from the first caller, leaving them both in an awkwardly wedged
- state where they'll no longer respond to any signals or graceful kills, and
- must be forcibly killed via OpenProcess()/TerminateProcess().
- The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
- instead of SO_REUSEADDR, which effectively affords the same semantics as
- SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
- Source world compared to Windows ones, this is a common mistake. A quick
- look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
- openssl.exe is called with the 's_server' option, for example. See
- http://bugs.python.org/issue2550 for more info. The following site also
- has a very thorough description about the implications of both REUSEADDR
- and EXCLUSIVEADDRUSE on Windows:
- http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
- XXX: although this approach is a vast improvement on previous attempts to
- elicit unused ports, it rests heavily on the assumption that the ephemeral
- port returned to us by the OS won't immediately be dished back out to some
- other process when we close and delete our temporary socket but before our
- calling code has a chance to bind the returned port. We can deal with this
- issue if/when we come across it.
- """
- tempsock = socket.socket(family, socktype)
- port = bind_port(tempsock)
- tempsock.close()
- del tempsock
- return port
- def bind_port(sock, host=HOST):
- """Bind the socket to a free port and return the port number. Relies on
- ephemeral ports in order to ensure we are using an unbound port. This is
- important as many tests may be running simultaneously, especially in a
- buildbot environment. This method raises an exception if the sock.family
- is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
- or SO_REUSEPORT set on it. Tests should *never* set these socket options
- for TCP/IP sockets. The only case for setting these options is testing
- multicasting via multiple UDP sockets.
- Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
- on Windows), it will be set on the socket. This will prevent anyone else
- from bind()'ing to our host/port for the duration of the test.
- """
- if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
- if hasattr(socket, 'SO_REUSEADDR'):
- if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
- raise TestFailed("tests should never set the SO_REUSEADDR " \
- "socket option on TCP/IP sockets!")
- if hasattr(socket, 'SO_REUSEPORT'):
- try:
- if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
- raise TestFailed("tests should never set the SO_REUSEPORT " \
- "socket option on TCP/IP sockets!")
- except OSError:
- # Python's socket module was compiled using modern headers
- # thus defining SO_REUSEPORT but this process is running
- # under an older kernel that does not support SO_REUSEPORT.
- pass
- if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
- sock.bind((host, 0))
- port = sock.getsockname()[1]
- return port
- def bind_unix_socket(sock, addr):
- """Bind a unix socket, raising SkipTest if PermissionError is raised."""
- assert sock.family == socket.AF_UNIX
- try:
- sock.bind(addr)
- except PermissionError:
- sock.close()
- raise unittest.SkipTest('cannot bind AF_UNIX sockets')
- def _is_ipv6_enabled():
- """Check whether IPv6 is enabled on this host."""
- if socket.has_ipv6:
- sock = None
- try:
- sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
- sock.bind((HOSTv6, 0))
- return True
- except OSError:
- pass
- finally:
- if sock:
- sock.close()
- return False
- IPV6_ENABLED = _is_ipv6_enabled()
- def system_must_validate_cert(f):
- """Skip the test on TLS certificate validation failures."""
- @functools.wraps(f)
- def dec(*args, **kwargs):
- try:
- f(*args, **kwargs)
- except OSError as e:
- if "CERTIFICATE_VERIFY_FAILED" in str(e):
- raise unittest.SkipTest("system does not contain "
- "necessary certificates")
- raise
- return dec
- # A constant likely larger than the underlying OS pipe buffer size, to
- # make writes blocking.
- # Windows limit seems to be around 512 B, and many Unix kernels have a
- # 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
- # (see issue #17835 for a discussion of this number).
- PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
- # A constant likely larger than the underlying OS socket buffer size, to make
- # writes blocking.
- # The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
- # on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
- # for a discussion of this number).
- SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
- # decorator for skipping tests on non-IEEE 754 platforms
- requires_IEEE_754 = unittest.skipUnless(
- float.__getformat__("double").startswith("IEEE"),
- "test requires IEEE 754 doubles")
- requires_zlib = unittest.skipUnless(zlib, 'requires zlib')
- requires_gzip = unittest.skipUnless(gzip, 'requires gzip')
- requires_bz2 = unittest.skipUnless(bz2, 'requires bz2')
- requires_lzma = unittest.skipUnless(lzma, 'requires lzma')
- is_jython = sys.platform.startswith('java')
- is_android = hasattr(sys, 'getandroidapilevel')
- if sys.platform != 'win32':
- unix_shell = '/system/bin/sh' if is_android else '/bin/sh'
- else:
- unix_shell = None
- # Filename used for testing
- if os.name == 'java':
- # Jython disallows @ in module names
- TESTFN = '$test'
- else:
- TESTFN = '@test'
- # Disambiguate TESTFN for parallel testing, while letting it remain a valid
- # module name.
- TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
- # Define the URL of a dedicated HTTP server for the network tests.
- # The URL must use clear-text HTTP: no redirection to encrypted HTTPS.
- TEST_HTTP_URL = "http://www.pythontest.net"
- # FS_NONASCII: non-ASCII character encodable by os.fsencode(),
- # or None if there is no such character.
- FS_NONASCII = None
- for character in (
- # First try printable and common characters to have a readable filename.
- # For each character, the encoding list are just example of encodings able
- # to encode the character (the list is not exhaustive).
- # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
- '\u00E6',
- # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
- '\u0130',
- # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
- '\u0141',
- # U+03C6 (Greek Small Letter Phi): cp1253
- '\u03C6',
- # U+041A (Cyrillic Capital Letter Ka): cp1251
- '\u041A',
- # U+05D0 (Hebrew Letter Alef): Encodable to cp424
- '\u05D0',
- # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
- '\u060C',
- # U+062A (Arabic Letter Teh): cp720
- '\u062A',
- # U+0E01 (Thai Character Ko Kai): cp874
- '\u0E01',
- # Then try more "special" characters. "special" because they may be
- # interpreted or displayed differently depending on the exact locale
- # encoding and the font.
- # U+00A0 (No-Break Space)
- '\u00A0',
- # U+20AC (Euro Sign)
- '\u20AC',
- ):
- try:
- # If Python is set up to use the legacy 'mbcs' in Windows,
- # 'replace' error mode is used, and encode() returns b'?'
- # for characters missing in the ANSI codepage
- if os.fsdecode(os.fsencode(character)) != character:
- raise UnicodeError
- except UnicodeError:
- pass
- else:
- FS_NONASCII = character
- break
- # TESTFN_UNICODE is a non-ascii filename
- TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
- if sys.platform == 'darwin':
- # In Mac OS X's VFS API file names are, by definition, canonically
- # decomposed Unicode, encoded using UTF-8. See QA1173:
- # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
- import unicodedata
- TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
- TESTFN_ENCODING = sys.getfilesystemencoding()
- # TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
- # encoded by the filesystem encoding (in strict mode). It can be None if we
- # cannot generate such filename.
- TESTFN_UNENCODABLE = None
- if os.name == 'nt':
- # skip win32s (0) or Windows 9x/ME (1)
- if sys.getwindowsversion().platform >= 2:
- # Different kinds of characters from various languages to minimize the
- # probability that the whole name is encodable to MBCS (issue #9819)
- TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
- try:
- TESTFN_UNENCODABLE.encode(TESTFN_ENCODING)
- except UnicodeEncodeError:
- pass
- else:
- print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). '
- 'Unicode filename tests may not be effective'
- % (TESTFN_UNENCODABLE, TESTFN_ENCODING))
- TESTFN_UNENCODABLE = None
- # Mac OS X denies unencodable filenames (invalid utf-8)
- elif sys.platform != 'darwin':
- try:
- # ascii and utf-8 cannot encode the byte 0xff
- b'\xff'.decode(TESTFN_ENCODING)
- except UnicodeDecodeError:
- # 0xff will be encoded using the surrogate character u+DCFF
- TESTFN_UNENCODABLE = TESTFN \
- + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape')
- else:
- # File system encoding (eg. ISO-8859-* encodings) can encode
- # the byte 0xff. Skip some unicode filename tests.
- pass
- # TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
- # decoded from the filesystem encoding (in strict mode). It can be None if we
- # cannot generate such filename (ex: the latin1 encoding can decode any byte
- # sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
- # to the surrogateescape error handler (PEP 383), but not from the filesystem
- # encoding in strict mode.
- TESTFN_UNDECODABLE = None
- for name in (
- # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
- # accepts it to create a file or a directory, or don't accept to enter to
- # such directory (when the bytes name is used). So test b'\xe7' first: it is
- # not decodable from cp932.
- b'\xe7w\xf0',
- # undecodable from ASCII, UTF-8
- b'\xff',
- # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
- # and cp857
- b'\xae\xd5'
- # undecodable from UTF-8 (UNIX and Mac OS X)
- b'\xed\xb2\x80', b'\xed\xb4\x80',
- # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
- # cp1253, cp1254, cp1255, cp1257, cp1258
- b'\x81\x98',
- ):
- try:
- name.decode(TESTFN_ENCODING)
- except UnicodeDecodeError:
- TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name
- break
- if FS_NONASCII:
- TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII
- else:
- TESTFN_NONASCII = None
- # Save the initial cwd
- SAVEDCWD = os.getcwd()
- # Set by libregrtest/main.py so we can skip tests that are not
- # useful for PGO
- PGO = False
- @contextlib.contextmanager
- def temp_dir(path=None, quiet=False):
- """Return a context manager that creates a temporary directory.
- Arguments:
- path: the directory to create temporarily. If omitted or None,
- defaults to creating a temporary directory using tempfile.mkdtemp.
- quiet: if False (the default), the context manager raises an exception
- on error. Otherwise, if the path is specified and cannot be
- created, only a warning is issued.
- """
- dir_created = False
- if path is None:
- path = tempfile.mkdtemp()
- dir_created = True
- path = os.path.realpath(path)
- else:
- try:
- os.mkdir(path)
- dir_created = True
- except OSError as exc:
- if not quiet:
- raise
- warnings.warn(f'tests may fail, unable to create '
- f'temporary directory {path!r}: {exc}',
- RuntimeWarning, stacklevel=3)
- if dir_created:
- pid = os.getpid()
- try:
- yield path
- finally:
- # In case the process forks, let only the parent remove the
- # directory. The child has a different process id. (bpo-30028)
- if dir_created and pid == os.getpid():
- rmtree(path)
- @contextlib.contextmanager
- def change_cwd(path, quiet=False):
- """Return a context manager that changes the current working directory.
- Arguments:
- path: the directory to use as the temporary current working directory.
- quiet: if False (the default), the context manager raises an exception
- on error. Otherwise, it issues only a warning and keeps the current
- working directory the same.
- """
- saved_dir = os.getcwd()
- try:
- os.chdir(path)
- except OSError as exc:
- if not quiet:
- raise
- warnings.warn(f'tests may fail, unable to change the current working '
- f'directory to {path!r}: {exc}',
- RuntimeWarning, stacklevel=3)
- try:
- yield os.getcwd()
- finally:
- os.chdir(saved_dir)
- @contextlib.contextmanager
- def temp_cwd(name='tempcwd', quiet=False):
- """
- Context manager that temporarily creates and changes the CWD.
- The function temporarily changes the current working directory
- after creating a temporary directory in the current directory with
- name *name*. If *name* is None, the temporary directory is
- created using tempfile.mkdtemp.
- If *quiet* is False (default) and it is not possible to
- create or change the CWD, an error is raised. If *quiet* is True,
- only a warning is raised and the original CWD is used.
- """
- with temp_dir(path=name, quiet=quiet) as temp_path:
- with change_cwd(temp_path, quiet=quiet) as cwd_dir:
- yield cwd_dir
- if hasattr(os, "umask"):
- @contextlib.contextmanager
- def temp_umask(umask):
- """Context manager that temporarily sets the process umask."""
- oldmask = os.umask(umask)
- try:
- yield
- finally:
- os.umask(oldmask)
- # TEST_HOME_DIR refers to the top level directory of the "test" package
- # that contains Python's regression test suite
- TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
- TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
- # TEST_DATA_DIR is used as a target download location for remote resources
- TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
- def findfile(filename, subdir=None):
- """Try to find a file on sys.path or in the test directory. If it is not
- found the argument passed to the function is returned (this does not
- necessarily signal failure; could still be the legitimate path).
- Setting *subdir* indicates a relative path to use to find the file
- rather than looking directly in the path directories.
- """
- if os.path.isabs(filename):
- return filename
- if subdir is not None:
- filename = os.path.join(subdir, filename)
- path = [TEST_HOME_DIR] + sys.path
- for dn in path:
- fn = os.path.join(dn, filename)
- if os.path.exists(fn): return fn
- return filename
- def create_empty_file(filename):
- """Create an empty file. If the file already exists, truncate it."""
- fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
- os.close(fd)
- def sortdict(dict):
- "Like repr(dict), but in sorted order."
- items = sorted(dict.items())
- reprpairs = ["%r: %r" % pair for pair in items]
- withcommas = ", ".join(reprpairs)
- return "{%s}" % withcommas
- def make_bad_fd():
- """
- Create an invalid file descriptor by opening and closing a file and return
- its fd.
- """
- file = open(TESTFN, "wb")
- try:
- return file.fileno()
- finally:
- file.close()
- unlink(TESTFN)
- def check_syntax_error(testcase, statement, *, lineno=None, offset=None):
- with testcase.assertRaises(SyntaxError) as cm:
- compile(statement, '<test string>', 'exec')
- err = cm.exception
- testcase.assertIsNotNone(err.lineno)
- if lineno is not None:
- testcase.assertEqual(err.lineno, lineno)
- testcase.assertIsNotNone(err.offset)
- if offset is not None:
- testcase.assertEqual(err.offset, offset)
- def open_urlresource(url, *args, **kw):
- import urllib.request, urllib.parse
- check = kw.pop('check', None)
- filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
- fn = os.path.join(TEST_DATA_DIR, filename)
- def check_valid_file(fn):
- f = open(fn, *args, **kw)
- if check is None:
- return f
- elif check(f):
- f.seek(0)
- return f
- f.close()
- if os.path.exists(fn):
- f = check_valid_file(fn)
- if f is not None:
- return f
- unlink(fn)
- # Verify the requirement before downloading the file
- requires('urlfetch')
- if verbose:
- print('\tfetching %s ...' % url, file=get_original_stdout())
- opener = urllib.request.build_opener()
- if gzip:
- opener.addheaders.append(('Accept-Encoding', 'gzip'))
- f = opener.open(url, timeout=15)
- if gzip and f.headers.get('Content-Encoding') == 'gzip':
- f = gzip.GzipFile(fileobj=f)
- try:
- with open(fn, "wb") as out:
- s = f.read()
- while s:
- out.write(s)
- s = f.read()
- finally:
- f.close()
- f = check_valid_file(fn)
- if f is not None:
- return f
- raise TestFailed('invalid resource %r' % fn)
- class WarningsRecorder(object):
- """Convenience wrapper for the warnings list returned on
- entry to the warnings.catch_warnings() context manager.
- """
- def __init__(self, warnings_list):
- self._warnings = warnings_list
- self._last = 0
- def __getattr__(self, attr):
- if len(self._warnings) > self._last:
- return getattr(self._warnings[-1], attr)
- elif attr in warnings.WarningMessage._WARNING_DETAILS:
- return None
- raise AttributeError("%r has no attribute %r" % (self, attr))
- @property
- def warnings(self):
- return self._warnings[self._last:]
- def reset(self):
- self._last = len(self._warnings)
- def _filterwarnings(filters, quiet=False):
- """Catch the warnings, then check if all the expected
- warnings have been raised and re-raise unexpected warnings.
- If 'quiet' is True, only re-raise the unexpected warnings.
- """
- # Clear the warning registry of the calling module
- # in order to re-raise the warnings.
- frame = sys._getframe(2)
- registry = frame.f_globals.get('__warningregistry__')
- if registry:
- registry.clear()
- with warnings.catch_warnings(record=True) as w:
- # Set filter "always" to record all warnings. Because
- # test_warnings swap the module, we need to look up in
- # the sys.modules dictionary.
- sys.modules['warnings'].simplefilter("always")
- yield WarningsRecorder(w)
- # Filter the recorded warnings
- reraise = list(w)
- missing = []
- for msg, cat in filters:
- seen = False
- for w in reraise[:]:
- warning = w.message
- # Filter out the matching messages
- if (re.match(msg, str(warning), re.I) and
- issubclass(warning.__class__, cat)):
- seen = True
- reraise.remove(w)
- if not seen and not quiet:
- # This filter caught nothing
- missing.append((msg, cat.__name__))
- if reraise:
- raise AssertionError("unhandled warning %s" % reraise[0])
- if missing:
- raise AssertionError("filter (%r, %s) did not catch any warning" %
- missing[0])
- @contextlib.contextmanager
- def check_warnings(*filters, **kwargs):
- """Context manager to silence warnings.
- Accept 2-tuples as positional arguments:
- ("message regexp", WarningCategory)
- Optional argument:
- - if 'quiet' is True, it does not fail if a filter catches nothing
- (default True without argument,
- default False if some filters are defined)
- Without argument, it defaults to:
- check_warnings(("", Warning), quiet=True)
- """
- quiet = kwargs.get('quiet')
- if not filters:
- filters = (("", Warning),)
- # Preserve backward compatibility
- if quiet is None:
- quiet = True
- return _filterwarnings(filters, quiet)
- @contextlib.contextmanager
- def check_no_resource_warning(testcase):
- """Context manager to check that no ResourceWarning is emitted.
- Usage:
- with check_no_resource_warning(self):
- f = open(...)
- ...
- del f
- You must remove the object which may emit ResourceWarning before
- the end of the context manager.
- """
- with warnings.catch_warnings(record=True) as warns:
- warnings.filterwarnings('always', category=ResourceWarning)
- yield
- gc_collect()
- testcase.assertEqual(warns, [])
- class CleanImport(object):
- """Context manager to force import to return a new module reference.
- This is useful for testing module-level behaviours, such as
- the emission of a DeprecationWarning on import.
- Use like this:
- with CleanImport("foo"):
- importlib.import_module("foo") # new reference
- """
- def __init__(self, *module_names):
- self.original_modules = sys.modules.copy()
- for module_name in module_names:
- if module_name in sys.modules:
- module = sys.modules[module_name]
- # It is possible that module_name is just an alias for
- # another module (e.g. stub for modules renamed in 3.x).
- # In that case, we also need delete the real module to clear
- # the import cache.
- if module.__name__ != module_name:
- del sys.modules[module.__name__]
- del sys.modules[module_name]
- def __enter__(self):
- return self
- def __exit__(self, *ignore_exc):
- sys.modules.update(self.original_modules)
- class EnvironmentVarGuard(collections.abc.MutableMapping):
- """Class to help protect the environment variable properly. Can be used as
- a context manager."""
- def __init__(self):
- self._environ = os.environ
- self._changed = {}
- def __getitem__(self, envvar):
- return self._environ[envvar]
- def __setitem__(self, envvar, value):
- # Remember the initial value on the first access
- if envvar not in self._changed:
- self._changed[envvar] = self._environ.get(envvar)
- self._environ[envvar] = value
- def __delitem__(self, envvar):
- # Remember the initial value on the first access
- if envvar not in self._changed:
- self._changed[envvar] = self._environ.get(envvar)
- if envvar in self._environ:
- del self._environ[envvar]
- def keys(self):
- return self._environ.keys()
- def __iter__(self):
- return iter(self._environ)
- def __len__(self):
- return len(self._environ)
- def set(self, envvar, value):
- self[envvar] = value
- def unset(self, envvar):
- del self[envvar]
- def __enter__(self):
- return self
- def __exit__(self, *ignore_exc):
- for (k, v) in self._changed.items():
- if v is None:
- if k in self._environ:
- del self._environ[k]
- else:
- self._environ[k] = v
- os.environ = self._environ
- class DirsOnSysPath(object):
- """Context manager to temporarily add directories to sys.path.
- This makes a copy of sys.path, appends any directories given
- as positional arguments, then reverts sys.path to the copied
- settings when the context ends.
- Note that *all* sys.path modifications in the body of the
- context manager, including replacement of the object,
- will be reverted at the end of the block.
- """
- def __init__(self, *paths):
- self.original_value = sys.path[:]
- self.original_object = sys.path
- sys.path.extend(paths)
- def __enter__(self):
- return self
- def __exit__(self, *ignore_exc):
- sys.path = self.original_object
- sys.path[:] = self.original_value
- class TransientResource(object):
- """Raise ResourceDenied if an exception is raised while the context manager
- is in effect that matches the specified exception and attributes."""
- def __init__(self, exc, **kwargs):
- self.exc = exc
- self.attrs = kwargs
- def __enter__(self):
- return self
- def __exit__(self, type_=None, value=None, traceback=None):
- """If type_ is a subclass of self.exc and value has attributes matching
- self.attrs, raise ResourceDenied. Otherwise let the exception
- propagate (if any)."""
- if type_ is not None and issubclass(self.exc, type_):
- for attr, attr_value in self.attrs.items():
- if not hasattr(value, attr):
- break
- if getattr(value, attr) != attr_value:
- break
- else:
- raise ResourceDenied("an optional resource is not available")
- # Context managers that raise ResourceDenied when various issues
- # with the Internet connection manifest themselves as exceptions.
- # XXX deprecate these and use transient_internet() instead
- time_out = TransientResource(OSError, errno=errno.ETIMEDOUT)
- socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
- ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
- def get_socket_conn_refused_errs():
- """
- Get the different socket error numbers ('errno') which can be received
- when a connection is refused.
- """
- errors = [errno.ECONNREFUSED]
- if hasattr(errno, 'ENETUNREACH'):
- # On Solaris, ENETUNREACH is returned sometimes instead of ECONNREFUSED
- errors.append(errno.ENETUNREACH)
- if hasattr(errno, 'EADDRNOTAVAIL'):
- # bpo-31910: socket.create_connection() fails randomly
- # with EADDRNOTAVAIL on Travis CI
- errors.append(errno.EADDRNOTAVAIL)
- if hasattr(errno, 'EHOSTUNREACH'):
- # bpo-37583: The destination host cannot be reached
- errors.append(errno.EHOSTUNREACH)
- return errors
- @contextlib.contextmanager
- def transient_internet(resource_name, *, timeout=30.0, errnos=()):
- """Return a context manager that raises ResourceDenied when various issues
- with the Internet connection manifest themselves as exceptions."""
- default_errnos = [
- ('ECONNREFUSED', 111),
- ('ECONNRESET', 104),
- ('EHOSTUNREACH', 113),
- ('ENETUNREACH', 101),
- ('ETIMEDOUT', 110),
- # socket.create_connection() fails randomly with
- # EADDRNOTAVAIL on Travis CI.
- ('EADDRNOTAVAIL', 99),
- ]
- default_gai_errnos = [
- ('EAI_AGAIN', -3),
- ('EAI_FAIL', -4),
- ('EAI_NONAME', -2),
- ('EAI_NODATA', -5),
- # Encountered when trying to resolve IPv6-only hostnames
- ('WSANO_DATA', 11004),
- ]
- denied = ResourceDenied("Resource %r is not available" % resource_name)
- captured_errnos = errnos
- gai_errnos = []
- if not captured_errnos:
- captured_errnos = [getattr(errno, name, num)
- for (name, num) in default_errnos]
- gai_errnos = [getattr(socket, name, num)
- for (name, num) in default_gai_errnos]
- def filter_error(err):
- n = getattr(err, 'errno', None)
- if (isinstance(err, socket.timeout) or
- (isinstance(err, socket.gaierror) and n in gai_errnos) or
- (isinstance(err, urllib.error.HTTPError) and
- 500 <= err.code <= 599) or
- (isinstance(err, urllib.error.URLError) and
- (("ConnectionRefusedError" in err.reason) or
- ("TimeoutError" in err.reason) or
- ("EOFError" in err.reason))) or
- n in captured_errnos):
- if not verbose:
- sys.stderr.write(denied.args[0] + "\n")
- raise denied from err
- old_timeout = socket.getdefaulttimeout()
- try:
- if timeout is not None:
- socket.setdefaulttimeout(timeout)
- yield
- except nntplib.NNTPTemporaryError as err:
- if verbose:
- sys.stderr.write(denied.args[0] + "\n")
- raise denied from err
- except OSError as err:
- # urllib can wrap original socket errors multiple times (!), we must
- # unwrap to get at the original error.
- while True:
- a = err.args
- if len(a) >= 1 and isinstance(a[0], OSError):
- err = a[0]
- # The error can also be wrapped as args[1]:
- # except socket.error as msg:
- # raise OSError('socket error', msg).with_traceback(sys.exc_info()[2])
- elif len(a) >= 2 and isinstance(a[1], OSError):
- err = a[1]
- else:
- break
- filter_error(err)
- raise
- # XXX should we catch generic exceptions and look for their
- # __cause__ or __context__?
- finally:
- socket.setdefaulttimeout(old_timeout)
- @contextlib.contextmanager
- def captured_output(stream_name):
- """Return a context manager used by captured_stdout/stdin/stderr
- that temporarily replaces the sys stream *stream_name* with a StringIO."""
- import io
- orig_stdout = getattr(sys, stream_name)
- setattr(sys, stream_name, io.StringIO())
- try:
- yield getattr(sys, stream_name)
- finally:
- setattr(sys, stream_name, orig_stdout)
- def captured_stdout():
- """Capture the output of sys.stdout:
- with captured_stdout() as stdout:
- print("hello")
- self.assertEqual(stdout.getvalue(), "hello\\n")
- """
- return captured_output("stdout")
- def captured_stderr():
- """Capture the output of sys.stderr:
- with captured_stderr() as stderr:
- print("hello", file=sys.stderr)
- self.assertEqual(stderr.getvalue(), "hello\\n")
- """
- return captured_output("stderr")
- def captured_stdin():
- """Capture the input to sys.stdin:
- with captured_stdin() as stdin:
- stdin.write('hello\\n')
- stdin.seek(0)
- # call test code that consumes from sys.stdin
- captured = input()
- self.assertEqual(captured, "hello")
- """
- return captured_output("stdin")
- def gc_collect():
- """Force as many objects as possible to be collected.
- In non-CPython implementations of Python, this is needed because timely
- deallocation is not guaranteed by the garbage collector. (Even in CPython
- this can be the case in case of reference cycles.) This means that __del__
- methods may be called later than expected and weakrefs may remain alive for
- longer than expected. This function tries its best to force all garbage
- objects to disappear.
- """
- gc.collect()
- if is_jython:
- time.sleep(0.1)
- gc.collect()
- gc.collect()
- @contextlib.contextmanager
- def disable_gc():
- have_gc = gc.isenabled()
- gc.disable()
- try:
- yield
- finally:
- if have_gc:
- gc.enable()
- def python_is_optimized():
- """Find if Python was built with optimizations."""
- cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
- final_opt = ""
- for opt in cflags.split():
- if opt.startswith('-O'):
- final_opt = opt
- return final_opt not in ('', '-O0', '-Og')
- _header = 'nP'
- _align = '0n'
- if hasattr(sys, "gettotalrefcount"):
- _header = '2P' + _header
- _align = '0P'
- _vheader = _header + 'n'
- def calcobjsize(fmt):
- return struct.calcsize(_header + fmt + _align)
- def calcvobjsize(fmt):
- return struct.calcsize(_vheader + fmt + _align)
- _TPFLAGS_HAVE_GC = 1<<14
- _TPFLAGS_HEAPTYPE = 1<<9
- def check_sizeof(test, o, size):
- import _testcapi
- result = sys.getsizeof(o)
- # add GC header size
- if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
- ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
- size += _testcapi.SIZEOF_PYGC_HEAD
- msg = 'wrong size for %s: got %d, expected %d' \
- % (type(o), result, size)
- test.assertEqual(result, size, msg)
- #=======================================================================
- # Decorator for running a function in a different locale, correctly resetting
- # it afterwards.
- def run_with_locale(catstr, *locales):
- def decorator(func):
- def inner(*args, **kwds):
- try:
- import locale
- category = getattr(locale, catstr)
- orig_locale = locale.setlocale(category)
- except AttributeError:
- # if the test author gives us an invalid category string
- raise
- except:
- # cannot retrieve original locale, so do nothing
- locale = orig_locale = None
- else:
- for loc in locales:
- try:
- locale.setlocale(category, loc)
- break
- except:
- pass
- # now run the function, resetting the locale on exceptions
- try:
- return func(*args, **kwds)
- finally:
- if locale and orig_locale:
- locale.setlocale(category, orig_locale)
- inner.__name__ = func.__name__
- inner.__doc__ = func.__doc__
- return inner
- return decorator
- #=======================================================================
- # Decorator for running a function in a specific timezone, correctly
- # resetting it afterwards.
- def run_with_tz(tz):
- def decorator(func):
- def inner(*args, **kwds):
- try:
- tzset = time.tzset
- except AttributeError:
- raise unittest.SkipTest("tzset required")
- if 'TZ' in os.environ:
- orig_tz = os.environ['TZ']
- else:
- orig_tz = None
- os.environ['TZ'] = tz
- tzset()
- # now run the function, resetting the tz on exceptions
- try:
- return func(*args, **kwds)
- finally:
- if orig_tz is None:
- del os.environ['TZ']
- else:
- os.environ['TZ'] = orig_tz
- time.tzset()
- inner.__name__ = func.__name__
- inner.__doc__ = func.__doc__
- return inner
- return decorator
- #=======================================================================
- # Big-memory-test support. Separate from 'resources' because memory use
- # should be configurable.
- # Some handy shorthands. Note that these are used for byte-limits as well
- # as size-limits, in the various bigmem tests
- _1M = 1024*1024
- _1G = 1024 * _1M
- _2G = 2 * _1G
- _4G = 4 * _1G
- MAX_Py_ssize_t = sys.maxsize
- def set_memlimit(limit):
- global max_memuse
- global real_max_memuse
- sizes = {
- 'k': 1024,
- 'm': _1M,
- 'g': _1G,
- 't': 1024*_1G,
- }
- m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
- re.IGNORECASE | re.VERBOSE)
- if m is None:
- raise ValueError('Invalid memory limit %r' % (limit,))
- memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
- real_max_memuse = memlimit
- if memlimit > MAX_Py_ssize_t:
- memlimit = MAX_Py_ssize_t
- if memlimit < _2G - 1:
- raise ValueError('Memory limit %r too low to be useful' % (limit,))
- max_memuse = memlimit
- class _MemoryWatchdog:
- """An object which periodically watches the process' memory consumption
- and prints it out.
- """
- def __init__(self):
- self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
- self.started = False
- def start(self):
- try:
- f = open(self.procfile, 'r')
- except OSError as e:
- warnings.warn('/proc not available for stats: {}'.format(e),
- RuntimeWarning)
- sys.stderr.flush()
- return
- watchdog_script = findfile("memory_watchdog.py")
- self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
- stdin=f, stderr=subprocess.DEVNULL)
- f.close()
- self.started = True
- def stop(self):
- if self.started:
- self.mem_watchdog.terminate()
- self.mem_watchdog.wait()
- def bigmemtest(size, memuse, dry_run=True):
- """Decorator for bigmem tests.
- 'size' is a requested size for the test (in arbitrary, test-interpreted
- units.) 'memuse' is the number of bytes per unit for the test, or a good
- estimate of it. For example, a test that needs two byte buffers, of 4 GiB
- each, could be decorated with @bigmemtest(size=_4G, memuse=2).
- The 'size' argument is normally passed to the decorated test method as an
- extra argument. If 'dry_run' is true, the value passed to the test method
- may be less than the requested value. If 'dry_run' is false, it means the
- test doesn't support dummy runs when -M is not specified.
- """
- def decorator(f):
- def wrapper(self):
- size = wrapper.size
- memuse = wrapper.memuse
- if not real_max_memuse:
- maxsize = 5147
- else:
- maxsize = size
- if ((real_max_memuse or not dry_run)
- and real_max_memuse < maxsize * memuse):
- raise unittest.SkipTest(
- "not enough memory: %.1fG minimum needed"
- % (size * memuse / (1024 ** 3)))
- if real_max_memuse and verbose:
- print()
- print(" ... expected peak memory use: {peak:.1f}G"
- .format(peak=size * memuse / (1024 ** 3)))
- watchdog = _MemoryWatchdog()
- watchdog.start()
- else:
- watchdog = None
- try:
- return f(self, maxsize)
- finally:
- if watchdog:
- watchdog.stop()
- wrapper.size = size
- wrapper.memuse = memuse
- return wrapper
- return decorator
- def bigaddrspacetest(f):
- """Decorator for tests that fill the address space."""
- def wrapper(self):
- if max_memuse < MAX_Py_ssize_t:
- if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
- raise unittest.SkipTest(
- "not enough memory: try a 32-bit build instead")
- else:
- raise unittest.SkipTest(
- "not enough memory: %.1fG minimum needed"
- % (MAX_Py_ssize_t / (1024 ** 3)))
- else:
- return f(self)
- return wrapper
- #=======================================================================
- # unittest integration.
- class BasicTestRunner:
- def run(self, test):
- result = unittest.TestResult()
- test(result)
- return result
- def _id(obj):
- return obj
- def requires_resource(resource):
- if resource == 'gui' and not _is_gui_available():
- return unittest.skip(_is_gui_available.reason)
- if is_resource_enabled(resource):
- return _id
- else:
- return unittest.skip("resource {0!r} is not enabled".format(resource))
- def cpython_only(test):
- """
- Decorator for tests only applicable on CPython.
- """
- return impl_detail(cpython=True)(test)
- def impl_detail(msg=None, **guards):
- if check_impl_detail(**guards):
- return _id
- if msg is None:
- guardnames, default = _parse_guards(guards)
- if default:
- msg = "implementation detail not available on {0}"
- else:
- msg = "implementation detail specific to {0}"
- guardnames = sorted(guardnames.keys())
- msg = msg.format(' or '.join(guardnames))
- return unittest.skip(msg)
- def _parse_guards(guards):
- # Returns a tuple ({platform_name: run_me}, default_value)
- if not guards:
- return ({'cpython': True}, False)
- is_true = list(guards.values())[0]
- assert list(guards.values()) == [is_true] * len(guards) # all True or all False
- return (guards, not is_true)
- # Use the following check to guard CPython's implementation-specific tests --
- # or to run them only on the implementation(s) guarded by the arguments.
- def check_impl_detail(**guards):
- """This function returns True or False depending on the host platform.
- Examples:
- if check_impl_detail(): # only on CPython (default)
- if check_impl_detail(jython=True): # only on Jython
- if check_impl_detail(cpython=False): # everywhere except on CPython
- """
- guards, default = _parse_guards(guards)
- return guards.get(platform.python_implementation().lower(), default)
- def no_tracing(func):
- """Decorator to temporarily turn off tracing for the duration of a test."""
- if not hasattr(sys, 'gettrace'):
- return func
- else:
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- original_trace = sys.gettrace()
- try:
- sys.settrace(None)
- return func(*args, **kwargs)
- finally:
- sys.settrace(original_trace)
- return wrapper
- def refcount_test(test):
- """Decorator for tests which involve reference counting.
- To start, the decorator does not run the test if is not run by CPython.
- After that, any trace function is unset during the test to prevent
- unexpected refcounts caused by the trace function.
- """
- return no_tracing(cpython_only(test))
- def _filter_suite(suite, pred):
- """Recursively filter test cases in a suite based on a predicate."""
- newtests = []
- for test in suite._tests:
- if isinstance(test, unittest.TestSuite):
- _filter_suite(test, pred)
- newtests.append(test)
- else:
- if pred(test):
- newtests.append(test)
- suite._tests = newtests
- def _run_suite(suite):
- """Run tests from a unittest.TestSuite-derived class."""
- runner = get_test_runner(sys.stdout,
- verbosity=verbose,
- capture_output=(junit_xml_list is not None))
- result = runner.run(suite)
- if junit_xml_list is not None:
- junit_xml_list.append(result.get_xml_element())
- if not result.testsRun and not result.skipped:
- raise TestDidNotRun
- if not result.wasSuccessful():
- if len(result.errors) == 1 and not result.failures:
- err = result.errors[0][1]
- elif len(result.failures) == 1 and not result.errors:
- err = result.failures[0][1]
- else:
- err = "multiple errors occurred"
- if not verbose: err += "; run in verbose mode for details"
- raise TestFailed(err)
- # By default, don't filter tests
- _match_test_func = None
- _accept_test_patterns = None
- _ignore_test_patterns = None
- def match_test(test):
- # Function used by support.run_unittest() and regrtest --list-cases
- if _match_test_func is None:
- return True
- else:
- return _match_test_func(test.id())
- def _is_full_match_test(pattern):
- # If a pattern contains at least one dot, it's considered
- # as a full test identifier.
- # Example: 'test.test_os.FileTests.test_access'.
- #
- # ignore patterns which contain fnmatch patterns: '*', '?', '[...]'
- # or '[!...]'. For example, ignore 'test_access*'.
- return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern))
- def set_match_tests(accept_patterns=None, ignore_patterns=None):
- global _match_test_func, _accept_test_patterns, _ignore_test_patterns
- if accept_patterns is None:
- accept_patterns = ()
- if ignore_patterns is None:
- ignore_patterns = ()
- accept_func = ignore_func = None
- if accept_patterns != _accept_test_patterns:
- accept_patterns, accept_func = _compile_match_function(accept_patterns)
- if ignore_patterns != _ignore_test_patterns:
- ignore_patterns, ignore_func = _compile_match_function(ignore_patterns)
- # Create a copy since patterns can be mutable and so modified later
- _accept_test_patterns = tuple(accept_patterns)
- _ignore_test_patterns = tuple(ignore_patterns)
- if accept_func is not None or ignore_func is not None:
- def match_function(test_id):
- accept = True
- ignore = False
- if accept_func:
- accept = accept_func(test_id)
- if ignore_func:
- ignore = ignore_func(test_id)
- return accept and not ignore
- _match_test_func = match_function
- def _compile_match_function(patterns):
- if not patterns:
- func = None
- # set_match_tests(None) behaves as set_match_tests(())
- patterns = ()
- elif all(map(_is_full_match_test, patterns)):
- # Simple case: all patterns are full test identifier.
- # The test.bisect_cmd utility only uses such full test identifiers.
- func = set(patterns).__contains__
- else:
- regex = '|'.join(map(fnmatch.translate, patterns))
- # The search *is* case sensitive on purpose:
- # don't use flags=re.IGNORECASE
- regex_match = re.compile(regex).match
- def match_test_regex(test_id):
- if regex_match(test_id):
- # The regex matchs the whole identifier like
- # 'test.test_os.FileTests.test_access'
- return True
- else:
- # Try to match parts of the test identifier.
- # For example, split 'test.test_os.FileTests.test_access'
- # into: 'test', 'test_os', 'FileTests' and 'test_access'.
- return any(map(regex_match, test_id.split(".")))
- func = match_test_regex
- return patterns, func
- def run_unittest(*classes):
- """Run tests from unittest.TestCase-derived classes."""
- valid_types = (unittest.TestSuite, unittest.TestCase)
- suite = unittest.TestSuite()
- for cls in classes:
- if isinstance(cls, str):
- if cls in sys.modules:
- suite.addTest(unittest.findTestCases(sys.modules[cls]))
- else:
- raise ValueError("str arguments must be keys in sys.modules")
- elif isinstance(cls, valid_types):
- suite.addTest(cls)
- else:
- suite.addTest(unittest.makeSuite(cls))
- _filter_suite(suite, match_test)
- _run_suite(suite)
- #=======================================================================
- # Check for the presence of docstrings.
- # Rather than trying to enumerate all the cases where docstrings may be
- # disabled, we just check for that directly
- def _check_docstrings():
- """Just used to check if docstrings are enabled"""
- MISSING_C_DOCSTRINGS = (check_impl_detail() and
- sys.platform != 'win32' and
- not sysconfig.get_config_var('WITH_DOC_STRINGS'))
- HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and
- not MISSING_C_DOCSTRINGS)
- requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
- "test requires docstrings")
- #=======================================================================
- # doctest driver.
- def run_doctest(module, verbosity=None, optionflags=0):
- """Run doctest on the given module. Return (#failures, #tests).
- If optional argument verbosity is not specified (or is None), pass
- support's belief about verbosity on to doctest. Else doctest's
- usual behavior is used (it searches sys.argv for -v).
- """
- import doctest
- if verbosity is None:
- verbosity = verbose
- else:
- verbosity = None
- f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags)
- if f:
- raise TestFailed("%d of %d doctests failed" % (f, t))
- if verbose:
- print('doctest (%s) ... %d tests with zero failures' %
- (module.__name__, t))
- return f, t
- #=======================================================================
- # Support for saving and restoring the imported modules.
- def modules_setup():
- return sys.modules.copy(),
- def modules_cleanup(oldmodules):
- # Encoders/decoders are registered permanently within the internal
- # codec cache. If we destroy the corresponding modules their
- # globals will be set to None which will trip up the cached functions.
- encodings = [(k, v) for k, v in sys.modules.items()
- if k.startswith('encodings.')]
- sys.modules.clear()
- sys.modules.update(encodings)
- # XXX: This kind of problem can affect more than just encodings. In particular
- # extension modules (such as _ssl) don't cope with reloading properly.
- # Really, test modules should be cleaning out the test specific modules they
- # know they added (ala test_runpy) rather than relying on this function (as
- # test_importhooks and test_pkg do currently).
- # Implicitly imported *real* modules should be left alone (see issue 10556).
- sys.modules.update(oldmodules)
- #=======================================================================
- # Threading support to prevent reporting refleaks when running regrtest.py -R
- # Flag used by saved_test_environment of test.libregrtest.save_env,
- # to check if a test modified the environment. The flag should be set to False
- # before running a new test.
- #
- # For example, threading_cleanup() sets the flag is the function fails
- # to cleanup threads.
- environment_altered = False
- # NOTE: we use thread._count() rather than threading.enumerate() (or the
- # moral equivalent thereof) because a threading.Thread object is still alive
- # until its __bootstrap() method has returned, even after it has been
- # unregistered from the threading module.
- # thread._count(), on the other hand, only gets decremented *after* the
- # __bootstrap() method has returned, which gives us reliable reference counts
- # at the end of a test run.
- def threading_setup():
- return _thread._count(), threading._dangling.copy()
- def threading_cleanup(*original_values):
- global environment_altered
- _MAX_COUNT = 100
- for count in range(_MAX_COUNT):
- values = _thread._count(), threading._dangling
- if values == original_values:
- break
- if not count:
- # Display a warning at the first iteration
- environment_altered = True
- dangling_threads = values[1]
- print("Warning -- threading_cleanup() failed to cleanup "
- "%s threads (count: %s, dangling: %s)"
- % (values[0] - original_values[0],
- values[0], len(dangling_threads)),
- file=sys.stderr)
- for thread in dangling_threads:
- print(f"Dangling thread: {thread!r}", file=sys.stderr)
- sys.stderr.flush()
- # Don't hold references to threads
- dangling_threads = None
- values = None
- time.sleep(0.01)
- gc_collect()
- def reap_threads(func):
- """Use this function when threads are being used. This will
- ensure that the threads are cleaned up even when the test fails.
- """
- @functools.wraps(func)
- def decorator(*args):
- key = threading_setup()
- try:
- return func(*args)
- finally:
- threading_cleanup(*key)
- return decorator
- @contextlib.contextmanager
- def wait_threads_exit(timeout=60.0):
- """
- bpo-31234: Context manager to wait until all threads created in the with
- statement exit.
- Use _thread.count() to check if threads exited. Indirectly, wait until
- threads exit the internal t_bootstrap() C function of the _thread module.
- threading_setup() and threading_cleanup() are designed to emit a warning
- if a test leaves running threads in the background. This context manager
- is designed to cleanup threads started by the _thread.start_new_thread()
- which doesn't allow to wait for thread exit, whereas thread.Thread has a
- join() method.
- """
- old_count = _thread._count()
- try:
- yield
- finally:
- start_time = time.monotonic()
- deadline = start_time + timeout
- while True:
- count = _thread._count()
- if count <= old_count:
- break
- if time.monotonic() > deadline:
- dt = time.monotonic() - start_time
- msg = (f"wait_threads() failed to cleanup {count - old_count} "
- f"threads after {dt:.1f} seconds "
- f"(count: {count}, old count: {old_count})")
- raise AssertionError(msg)
- time.sleep(0.010)
- gc_collect()
- def join_thread(thread, timeout=30.0):
- """Join a thread. Raise an AssertionError if the thread is still alive
- after timeout seconds.
- """
- thread.join(timeout)
- if thread.is_alive():
- msg = f"failed to join the thread in {timeout:.1f} seconds"
- raise AssertionError(msg)
- def reap_children():
- """Use this function at the end of test_main() whenever sub-processes
- are started. This will help ensure that no extra children (zombies)
- stick around to hog resources and create problems when looking
- for refleaks.
- """
- global environment_altered
- # Need os.waitpid(-1, os.WNOHANG): Windows is not supported
- if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')):
- return
- # Reap all our dead child processes so we don't leave zombies around.
- # These hog resources and might be causing some of the buildbots to die.
- while True:
- try:
- # Read the exit status of any child process which already completed
- pid, status = os.waitpid(-1, os.WNOHANG)
- except OSError:
- break
- if pid == 0:
- break
- print("Warning -- reap_children() reaped child process %s"
- % pid, file=sys.stderr)
- environment_altered = True
- @contextlib.contextmanager
- def start_threads(threads, unlock=None):
- threads = list(threads)
- started = []
- try:
- try:
- for t in threads:
- t.start()
- started.append(t)
- except:
- if verbose:
- print("Can't start %d threads, only %d threads started" %
- (len(threads), len(started)))
- raise
- yield
- finally:
- try:
- if unlock:
- unlock()
- endtime = starttime = time.monotonic()
- for timeout in range(1, 16):
- endtime += 60
- for t in started:
- t.join(max(endtime - time.monotonic(), 0.01))
- started = [t for t in started if t.is_alive()]
- if not started:
- break
- if verbose:
- print('Unable to join %d threads during a period of '
- '%d minutes' % (len(started), timeout))
- finally:
- started = [t for t in started if t.is_alive()]
- if started:
- faulthandler.dump_traceback(sys.stdout)
- raise AssertionError('Unable to join %d threads' % len(started))
- @contextlib.contextmanager
- def swap_attr(obj, attr, new_val):
- """Temporary swap out an attribute with a new object.
- Usage:
- with swap_attr(obj, "attr", 5):
- ...
- This will set obj.attr to 5 for the duration of the with: block,
- restoring the old value at the end of the block. If `attr` doesn't
- exist on `obj`, it will be created and then deleted at the end of the
- block.
- The old value (or None if it doesn't exist) will be assigned to the
- target of the "as" clause, if there is one.
- """
- if hasattr(obj, attr):
- real_val = getattr(obj, attr)
- setattr(obj, attr, new_val)
- try:
- yield real_val
- finally:
- setattr(obj, attr, real_val)
- else:
- setattr(obj, attr, new_val)
- try:
- yield
- finally:
- if hasattr(obj, attr):
- delattr(obj, attr)
- @contextlib.contextmanager
- def swap_item(obj, item, new_val):
- """Temporary swap out an item with a new object.
- Usage:
- with swap_item(obj, "item", 5):
- ...
- This will set obj["item"] to 5 for the duration of the with: block,
- restoring the old value at the end of the block. If `item` doesn't
- exist on `obj`, it will be created and then deleted at the end of the
- block.
- The old value (or None if it doesn't exist) will be assigned to the
- target of the "as" clause, if there is one.
- """
- if item in obj:
- real_val = obj[item]
- obj[item] = new_val
- try:
- yield real_val
- finally:
- obj[item] = real_val
- else:
- obj[item] = new_val
- try:
- yield
- finally:
- if item in obj:
- del obj[item]
- def strip_python_stderr(stderr):
- """Strip the stderr of a Python process from potential debug output
- emitted by the interpreter.
- This will typically be run on the result of the communicate() method
- of a subprocess.Popen object.
- """
- stderr = re.sub(br"\[\d+ refs, \d+ blocks\]\r?\n?", b"", stderr).strip()
- return stderr
- requires_type_collecting = unittest.skipIf(hasattr(sys, 'getcounts'),
- 'types are immortal if COUNT_ALLOCS is defined')
- def args_from_interpreter_flags():
- """Return a list of command-line arguments reproducing the current
- settings in sys.flags and sys.warnoptions."""
- return subprocess._args_from_interpreter_flags()
- def optim_args_from_interpreter_flags():
- """Return a list of command-line arguments reproducing the current
- optimization settings in sys.flags."""
- return subprocess._optim_args_from_interpreter_flags()
- #============================================================
- # Support for assertions about logging.
- #============================================================
- class TestHandler(logging.handlers.BufferingHandler):
- def __init__(self, matcher):
- # BufferingHandler takes a "capacity" argument
- # so as to know when to flush. As we're overriding
- # shouldFlush anyway, we can set a capacity of zero.
- # You can call flush() manually to clear out the
- # buffer.
- logging.handlers.BufferingHandler.__init__(self, 0)
- self.matcher = matcher
- def shouldFlush(self):
- return False
- def emit(self, record):
- self.format(record)
- self.buffer.append(record.__dict__)
- def matches(self, **kwargs):
- """
- Look for a saved dict whose keys/values match the supplied arguments.
- """
- result = False
- for d in self.buffer:
- if self.matcher.matches(d, **kwargs):
- result = True
- break
- return result
- class Matcher(object):
- _partial_matches = ('msg', 'message')
- def matches(self, d, **kwargs):
- """
- Try to match a single dict with the supplied arguments.
- Keys whose values are strings and which are in self._partial_matches
- will be checked for partial (i.e. substring) matches. You can extend
- this scheme to (for example) do regular expression matching, etc.
- """
- result = True
- for k in kwargs:
- v = kwargs[k]
- dv = d.get(k)
- if not self.match_value(k, dv, v):
- result = False
- break
- return result
- def match_value(self, k, dv, v):
- """
- Try to match a single stored value (dv) with a supplied value (v).
- """
- if type(v) != type(dv):
- result = False
- elif type(dv) is not str or k not in self._partial_matches:
- result = (v == dv)
- else:
- result = dv.find(v) >= 0
- return result
- _can_symlink = None
- def can_symlink():
- global _can_symlink
- if _can_symlink is not None:
- return _can_symlink
- symlink_path = TESTFN + "can_symlink"
- try:
- os.symlink(TESTFN, symlink_path)
- can = True
- except (OSError, NotImplementedError, AttributeError):
- can = False
- else:
- os.remove(symlink_path)
- _can_symlink = can
- return can
- def skip_unless_symlink(test):
- """Skip decorator for tests that require functional symlink"""
- ok = can_symlink()
- msg = "Requires functional symlink implementation"
- return test if ok else unittest.skip(msg)(test)
- _can_xattr = None
- def can_xattr():
- global _can_xattr
- if _can_xattr is not None:
- return _can_xattr
- if not hasattr(os, "setxattr"):
- can = False
- else:
- tmp_dir = tempfile.mkdtemp()
- tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
- try:
- with open(TESTFN, "wb") as fp:
- try:
- # TESTFN & tempfile may use different file systems with
- # different capabilities
- os.setxattr(tmp_fp, b"user.test", b"")
- os.setxattr(tmp_name, b"trusted.foo", b"42")
- os.setxattr(fp.fileno(), b"user.test", b"")
- # Kernels < 2.6.39 don't respect setxattr flags.
- kernel_version = platform.release()
- m = re.match(r"2.6.(\d{1,2})", kernel_version)
- can = m is None or int(m.group(1)) >= 39
- except OSError:
- can = False
- finally:
- unlink(TESTFN)
- unlink(tmp_name)
- rmdir(tmp_dir)
- _can_xattr = can
- return can
- def skip_unless_xattr(test):
- """Skip decorator for tests that require functional extended attributes"""
- ok = can_xattr()
- msg = "no non-broken extended attribute support"
- return test if ok else unittest.skip(msg)(test)
- _bind_nix_socket_error = None
- def skip_unless_bind_unix_socket(test):
- """Decorator for tests requiring a functional bind() for unix sockets."""
- if not hasattr(socket, 'AF_UNIX'):
- return unittest.skip('No UNIX Sockets')(test)
- global _bind_nix_socket_error
- if _bind_nix_socket_error is None:
- path = TESTFN + "can_bind_unix_socket"
- with socket.socket(socket.AF_UNIX) as sock:
- try:
- sock.bind(path)
- _bind_nix_socket_error = False
- except OSError as e:
- _bind_nix_socket_error = e
- finally:
- unlink(path)
- if _bind_nix_socket_error:
- msg = 'Requires a functional unix bind(): %s' % _bind_nix_socket_error
- return unittest.skip(msg)(test)
- else:
- return test
- def fs_is_case_insensitive(directory):
- """Detects if the file system for the specified directory is case-insensitive."""
- with tempfile.NamedTemporaryFile(dir=directory) as base:
- base_path = base.name
- case_path = base_path.upper()
- if case_path == base_path:
- case_path = base_path.lower()
- try:
- return os.path.samefile(base_path, case_path)
- except FileNotFoundError:
- return False
- def detect_api_mismatch(ref_api, other_api, *, ignore=()):
- """Returns the set of items in ref_api not in other_api, except for a
- defined list of items to be ignored in this check.
- By default this skips private attributes beginning with '_' but
- includes all magic methods, i.e. those starting and ending in '__'.
- """
- missing_items = set(dir(ref_api)) - set(dir(other_api))
- if ignore:
- missing_items -= set(ignore)
- missing_items = set(m for m in missing_items
- if not m.startswith('_') or m.endswith('__'))
- return missing_items
- def check__all__(test_case, module, name_of_module=None, extra=(),
- blacklist=()):
- """Assert that the __all__ variable of 'module' contains all public names.
- The module's public names (its API) are detected automatically based on
- whether they match the public name convention and were defined in
- 'module'.
- The 'name_of_module' argument can specify (as a string or tuple thereof)
- what module(s) an API could be defined in in order to be detected as a
- public API. One case for this is when 'module' imports part of its public
- API from other modules, possibly a C backend (like 'csv' and its '_csv').
- The 'extra' argument can be a set of names that wouldn't otherwise be
- automatically detected as "public", like objects without a proper
- '__module__' attribute. If provided, it will be added to the
- automatically detected ones.
- The 'blacklist' argument can be a set of names that must not be treated
- as part of the public API even though their names indicate otherwise.
- Usage:
- import bar
- import foo
- import unittest
- from test import support
- class MiscTestCase(unittest.TestCase):
- def test__all__(self):
- support.check__all__(self, foo)
- class OtherTestCase(unittest.TestCase):
- def test__all__(self):
- extra = {'BAR_CONST', 'FOO_CONST'}
- blacklist = {'baz'} # Undocumented name.
- # bar imports part of its API from _bar.
- support.check__all__(self, bar, ('bar', '_bar'),
- extra=extra, blacklist=blacklist)
- """
- if name_of_module is None:
- name_of_module = (module.__name__, )
- elif isinstance(name_of_module, str):
- name_of_module = (name_of_module, )
- expected = set(extra)
- for name in dir(module):
- if name.startswith('_') or name in blacklist:
- continue
- obj = getattr(module, name)
- if (getattr(obj, '__module__', None) in name_of_module or
- (not hasattr(obj, '__module__') and
- not isinstance(obj, types.ModuleType))):
- expected.add(name)
- test_case.assertCountEqual(module.__all__, expected)
- class SuppressCrashReport:
- """Try to prevent a crash report from popping up.
- On Windows, don't display the Windows Error Reporting dialog. On UNIX,
- disable the creation of coredump file.
- """
- old_value = None
- old_modes = None
- def __enter__(self):
- """On Windows, disable Windows Error Reporting dialogs using
- SetErrorMode.
- On UNIX, try to save the previous core file size limit, then set
- soft limit to 0.
- """
- if sys.platform.startswith('win'):
- # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
- # GetErrorMode is not available on Windows XP and Windows Server 2003,
- # but SetErrorMode returns the previous value, so we can use that
- import ctypes
- self._k32 = ctypes.windll.kernel32
- SEM_NOGPFAULTERRORBOX = 0x02
- self.old_value = self._k32.SetErrorMode(SEM_NOGPFAULTERRORBOX)
- self._k32.SetErrorMode(self.old_value | SEM_NOGPFAULTERRORBOX)
- # Suppress assert dialogs in debug builds
- # (see http://bugs.python.org/issue23314)
- try:
- import msvcrt
- msvcrt.CrtSetReportMode
- except (AttributeError, ImportError):
- # no msvcrt or a release build
- pass
- else:
- self.old_modes = {}
- for report_type in [msvcrt.CRT_WARN,
- msvcrt.CRT_ERROR,
- msvcrt.CRT_ASSERT]:
- old_mode = msvcrt.CrtSetReportMode(report_type,
- msvcrt.CRTDBG_MODE_FILE)
- old_file = msvcrt.CrtSetReportFile(report_type,
- msvcrt.CRTDBG_FILE_STDERR)
- self.old_modes[report_type] = old_mode, old_file
- else:
- if resource is not None:
- try:
- self.old_value = resource.getrlimit(resource.RLIMIT_CORE)
- resource.setrlimit(resource.RLIMIT_CORE,
- (0, self.old_value[1]))
- except (ValueError, OSError):
- pass
- if sys.platform == 'darwin':
- # Check if the 'Crash Reporter' on OSX was configured
- # in 'Developer' mode and warn that it will get triggered
- # when it is.
- #
- # This assumes that this context manager is used in tests
- # that might trigger the next manager.
- cmd = ['/usr/bin/defaults', 'read',
- 'com.apple.CrashReporter', 'DialogType']
- proc = subprocess.Popen(cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- with proc:
- stdout = proc.communicate()[0]
- if stdout.strip() == b'developer':
- print("this test triggers the Crash Reporter, "
- "that is intentional", end='', flush=True)
- return self
- def __exit__(self, *ignore_exc):
- """Restore Windows ErrorMode or core file behavior to initial value."""
- if self.old_value is None:
- return
- if sys.platform.startswith('win'):
- self._k32.SetErrorMode(self.old_value)
- if self.old_modes:
- import msvcrt
- for report_type, (old_mode, old_file) in self.old_modes.items():
- msvcrt.CrtSetReportMode(report_type, old_mode)
- msvcrt.CrtSetReportFile(report_type, old_file)
- else:
- if resource is not None:
- try:
- resource.setrlimit(resource.RLIMIT_CORE, self.old_value)
- except (ValueError, OSError):
- pass
- def patch(test_instance, object_to_patch, attr_name, new_value):
- """Override 'object_to_patch'.'attr_name' with 'new_value'.
- Also, add a cleanup procedure to 'test_instance' to restore
- 'object_to_patch' value for 'attr_name'.
- The 'attr_name' should be a valid attribute for 'object_to_patch'.
- """
- # check that 'attr_name' is a real attribute for 'object_to_patch'
- # will raise AttributeError if it does not exist
- getattr(object_to_patch, attr_name)
- # keep a copy of the old value
- attr_is_local = False
- try:
- old_value = object_to_patch.__dict__[attr_name]
- except (AttributeError, KeyError):
- old_value = getattr(object_to_patch, attr_name, None)
- else:
- attr_is_local = True
- # restore the value when the test is done
- def cleanup():
- if attr_is_local:
- setattr(object_to_patch, attr_name, old_value)
- else:
- delattr(object_to_patch, attr_name)
- test_instance.addCleanup(cleanup)
- # actually override the attribute
- setattr(object_to_patch, attr_name, new_value)
- def run_in_subinterp(code):
- """
- Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
- module is enabled.
- """
- # Issue #10915, #15751: PyGILState_*() functions don't work with
- # sub-interpreters, the tracemalloc module uses these functions internally
- try:
- import tracemalloc
- except ImportError:
- pass
- else:
- if tracemalloc.is_tracing():
- raise unittest.SkipTest("run_in_subinterp() cannot be used "
- "if tracemalloc module is tracing "
- "memory allocations")
- import _testcapi
- return _testcapi.run_in_subinterp(code)
- def check_free_after_iterating(test, iter, cls, args=()):
- class A(cls):
- def __del__(self):
- nonlocal done
- done = True
- try:
- next(it)
- except StopIteration:
- pass
- done = False
- it = iter(A(*args))
- # Issue 26494: Shouldn't crash
- test.assertRaises(StopIteration, next, it)
- # The sequence should be deallocated just after the end of iterating
- gc_collect()
- test.assertTrue(done)
- def missing_compiler_executable(cmd_names=[]):
- """Check if the compiler components used to build the interpreter exist.
- Check for the existence of the compiler executables whose names are listed
- in 'cmd_names' or all the compiler executables when 'cmd_names' is empty
- and return the first missing executable or None when none is found
- missing.
- """
- from distutils import ccompiler, sysconfig, spawn
- compiler = ccompiler.new_compiler()
- sysconfig.customize_compiler(compiler)
- for name in compiler.executables:
- if cmd_names and name not in cmd_names:
- continue
- cmd = getattr(compiler, name)
- if cmd_names:
- assert cmd is not None, \
- "the '%s' executable is not configured" % name
- elif cmd is None:
- continue
- if spawn.find_executable(cmd[0]) is None:
- return cmd[0]
- _is_android_emulator = None
- def setswitchinterval(interval):
- # Setting a very low gil interval on the Android emulator causes python
- # to hang (issue #26939).
- minimum_interval = 1e-5
- if is_android and interval < minimum_interval:
- global _is_android_emulator
- if _is_android_emulator is None:
- _is_android_emulator = (subprocess.check_output(
- ['getprop', 'ro.kernel.qemu']).strip() == b'1')
- if _is_android_emulator:
- interval = minimum_interval
- return sys.setswitchinterval(interval)
- @contextlib.contextmanager
- def disable_faulthandler():
- # use sys.__stderr__ instead of sys.stderr, since regrtest replaces
- # sys.stderr with a StringIO which has no file descriptor when a test
- # is run with -W/--verbose3.
- fd = sys.__stderr__.fileno()
- is_enabled = faulthandler.is_enabled()
- try:
- faulthandler.disable()
- yield
- finally:
- if is_enabled:
- faulthandler.enable(file=fd, all_threads=True)
- def fd_count():
- """Count the number of open file descriptors.
- """
- if sys.platform.startswith(('linux', 'freebsd')):
- try:
- names = os.listdir("/proc/self/fd")
- # Subtract one because listdir() internally opens a file
- # descriptor to list the content of the /proc/self/fd/ directory.
- return len(names) - 1
- except FileNotFoundError:
- pass
- MAXFD = 256
- if hasattr(os, 'sysconf'):
- try:
- MAXFD = os.sysconf("SC_OPEN_MAX")
- except OSError:
- pass
- old_modes = None
- if sys.platform == 'win32':
- # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
- # on invalid file descriptor if Python is compiled in debug mode
- try:
- import msvcrt
- msvcrt.CrtSetReportMode
- except (AttributeError, ImportError):
- # no msvcrt or a release build
- pass
- else:
- old_modes = {}
- for report_type in (msvcrt.CRT_WARN,
- msvcrt.CRT_ERROR,
- msvcrt.CRT_ASSERT):
- old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, 0)
- try:
- count = 0
- for fd in range(MAXFD):
- try:
- # Prefer dup() over fstat(). fstat() can require input/output
- # whereas dup() doesn't.
- fd2 = os.dup(fd)
- except OSError as e:
- if e.errno != errno.EBADF:
- raise
- else:
- os.close(fd2)
- count += 1
- finally:
- if old_modes is not None:
- for report_type in (msvcrt.CRT_WARN,
- msvcrt.CRT_ERROR,
- msvcrt.CRT_ASSERT):
- msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
- return count
- class SaveSignals:
- """
- Save an restore signal handlers.
- This class is only able to save/restore signal handlers registered
- by the Python signal module: see bpo-13285 for "external" signal
- handlers.
- """
- def __init__(self):
- import signal
- self.signal = signal
- self.signals = list(range(1, signal.NSIG))
- # SIGKILL and SIGSTOP signals cannot be ignored nor caught
- for signame in ('SIGKILL', 'SIGSTOP'):
- try:
- signum = getattr(signal, signame)
- except AttributeError:
- continue
- self.signals.remove(signum)
- self.handlers = {}
- def save(self):
- for signum in self.signals:
- handler = self.signal.getsignal(signum)
- if handler is None:
- # getsignal() returns None if a signal handler was not
- # registered by the Python signal module,
- # and the handler is not SIG_DFL nor SIG_IGN.
- #
- # Ignore the signal: we cannot restore the handler.
- continue
- self.handlers[signum] = handler
- def restore(self):
- for signum, handler in self.handlers.items():
- self.signal.signal(signum, handler)
- def with_pymalloc():
- import _testcapi
- return _testcapi.WITH_PYMALLOC
- class FakePath:
- """Simple implementing of the path protocol.
- """
- def __init__(self, path):
- self.path = path
- def __repr__(self):
- return f'<FakePath {self.path!r}>'
- def __fspath__(self):
- if (isinstance(self.path, BaseException) or
- isinstance(self.path, type) and
- issubclass(self.path, BaseException)):
- raise self.path
- else:
- return self.path
- class _ALWAYS_EQ:
- """
- Object that is equal to anything.
- """
- def __eq__(self, other):
- return True
- def __ne__(self, other):
- return False
- ALWAYS_EQ = _ALWAYS_EQ()
- @functools.total_ordering
- class _LARGEST:
- """
- Object that is greater than anything (except itself).
- """
- def __eq__(self, other):
- return isinstance(other, _LARGEST)
- def __lt__(self, other):
- return False
- LARGEST = _LARGEST()
- @functools.total_ordering
- class _SMALLEST:
- """
- Object that is less than anything (except itself).
- """
- def __eq__(self, other):
- return isinstance(other, _SMALLEST)
- def __gt__(self, other):
- return False
- SMALLEST = _SMALLEST()
|