Source code for pyomo.common.tee

#  ___________________________________________________________________________
#
#  Pyomo: Python Optimization Modeling Objects
#  Copyright (c) 2008-2024
#  National Technology and Engineering Solutions of Sandia, LLC
#  Under the terms of Contract DE-NA0003525 with National Technology and
#  Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
#  rights in this software.
#  This software is distributed under the 3-clause BSD License.
#  ___________________________________________________________________________
#
#  This module was originally developed as part of the PyUtilib project
#  Copyright (c) 2008 Sandia Corporation.
#  This software is distributed under the BSD License.
#  Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
#  the U.S. Government retains certain rights in this software.
#  ___________________________________________________________________________
#
import io
import logging
import os
import sys
import threading
import time
from io import StringIO

_poll_interval = 0.0001
_poll_rampup_limit = 0.099
# reader polling: number of timeouts with no data before increasing the
# polling interval
_poll_rampup = 10
# polling timeout when waiting to close threads.  This will bail on
# closing threast after a minimum of 13.1 seconds and a worst case of
# ~(13.1 * #threads) seconds
_poll_timeout = 3  # 15 rounds: 0.0001 * 2**15 == 3.2768
_poll_timeout_deadlock = 200  # 21 rounds: 0.0001 * 2**21 == 209.7152

_mswindows = sys.platform.startswith('win')
try:
    if _mswindows:
        from msvcrt import get_osfhandle
        from win32pipe import PeekNamedPipe
        from win32file import ReadFile
    else:
        from select import select
    _peek_available = True
except ImportError:
    _peek_available = False

logger = logging.getLogger(__name__)


class _SignalFlush(object):
    def __init__(self, ostream, handle):
        super().__setattr__('_ostream', ostream)
        super().__setattr__('_handle', handle)

    def flush(self):
        self._ostream.flush()
        self._handle.flush = True

    def __getattr__(self, attr):
        return getattr(self._ostream, attr)

    def __setattr__(self, attr, val):
        return setattr(self._ostream, attr, val)


class _AutoFlush(_SignalFlush):
    def write(self, data):
        self._ostream.write(data)
        self.flush()

    def writelines(self, data):
        self._ostream.writelines(data)
        self.flush()


[docs] class redirect_fd(object): """Redirect a file descriptor to a new file or file descriptor. This context manager will redirect the specified file descriptor to a specified new output target (either file name or file descriptor). For the special case of file descriptors 1 (stdout) and 2 (stderr), we will also make sure that the Python `sys.stdout` or `sys.stderr` remain usable: in the case of synchronize=True, the `sys.stdout` / `sys.stderr` file handles point to the new file descriptor. When synchronize=False, we preserve the behavior of the Python file object (retargeting it to the original file descriptor if necessary). Parameters ---------- fd: int The file descriptor to redirect output: int or str The new output target for `fd`: either another valid file descriptor (int) or a string with the file to open. synchronize: bool If True, and `fd` is 1 or 2, then update `sys.stdout` or `sys.stderr` to also point to the new file descriptor """
[docs] def __init__(self, fd=1, output=None, synchronize=True): if output is None: # /dev/null is used just to discard what is being printed output = os.devnull self.fd = fd self.std = {1: 'stdout', 2: 'stderr'}.get(self.fd, None) self.target = output self.target_file = None self.synchronize = synchronize self.original_file = None self.original_fd = None
def __enter__(self): if self.std: self.original_file = getattr(sys, self.std) # important: flush the current file buffer when redirecting self.original_file.flush() # Duplicate the original standard file descriptor(file # descriptor 1 or 2) to a different file descriptor number self.original_fd = os.dup(self.fd) # Open a file descriptor pointing to the new file if isinstance(self.target, int): out_fd = self.target else: out_fd = os.open(self.target, os.O_WRONLY) # Duplicate the file descriptor for the opened file, closing and # overwriting the value for stdout (file descriptor 1). Only # make the new FD inheritable if it is stdout/stderr os.dup2(out_fd, self.fd, inheritable=bool(self.std)) # We no longer need this original file descriptor if not isinstance(self.target, int): os.close(out_fd) if self.std: if self.synchronize: # Cause Python's stdout to point to our new file fd = self.fd else: # IF we are not synchronizing the std file object with # the redirected file descriptor, and IF the current # file object is pointing to the original file # descriptor that we just redirected, then we want to # retarget the std file to the original (duplicated) # target file descriptor. This allows, e.g. Python to # still write to stdout when we redirect fd=1 to # /dev/null try: old_std_fd = getattr(sys, self.std).fileno() fd = self.original_fd if old_std_fd == self.fd else None except (io.UnsupportedOperation, AttributeError): fd = None if fd is not None: self.target_file = os.fdopen(fd, 'w', closefd=False) setattr(sys, self.std, self.target_file) return self def __exit__(self, t, v, traceback): # Close output: this either closes the new file that we opened, # or else the new file that points to the original (duplicated) # file descriptor if self.target_file is not None: self.target_file.flush() self.target_file.close() self.target_file = None setattr(sys, self.std, self.original_file) # Restore stdout's FD (implicitly closing the FD we opened) os.dup2(self.original_fd, self.fd, inheritable=bool(self.std)) # Close the temporary FD os.close(self.original_fd)
[docs] class capture_output(object): """Context manager to capture output sent to sys.stdout and sys.stderr This is a drop-in substitute for PyUtilib's capture_output to temporarily redirect output to the provided stream or file. Parameters ---------- output : io.TextIOBase, TeeStream, str, or None Output stream where all captured stdout/stderr data is sent. If a ``str`` is provided, it is used as a file name and opened (potentially overwriting any existing file). If ``None``, a :class:`io.StringIO` object is created and used. capture_fd : bool If True, we will also redirect the process file descriptors ``1`` (stdout), ``2`` (stderr), and the file descriptors from ``sys.stdout.fileno()`` and ``sys.stderr.fileno()`` to the ``output``. This is useful for capturing output emitted directly to the process stdout / stderr by external compiled modules. Returns ------- io.TextIOBase This is the output stream object where all data is sent. """
[docs] def __init__(self, output=None, capture_fd=False): if output is None: output = StringIO() self.output = output self.output_file = None self.old = None self.tee = None self.capture_fd = capture_fd self.fd_redirect = None
def __enter__(self): self.old = (sys.stdout, sys.stderr) if isinstance(self.output, str): self.output_stream = open(self.output, 'w') else: self.output_stream = self.output if isinstance(self.output, TeeStream): self.tee = self.output else: self.tee = TeeStream(self.output_stream) self.tee.__enter__() sys.stdout = self.tee.STDOUT sys.stderr = self.tee.STDERR if self.capture_fd: tee_fd = (self.tee.STDOUT.fileno(), self.tee.STDERR.fileno()) self.fd_redirect = [] for i in range(2): # Redirect the standard process file descriptor (1 or 2) self.fd_redirect.append( redirect_fd(i + 1, tee_fd[i], synchronize=False) ) # Redirect the file descriptor currently associated with # sys.stdout / sys.stderr try: fd = self.old[i].fileno() except (AttributeError, OSError): pass else: if fd != i + 1: self.fd_redirect.append( redirect_fd(fd, tee_fd[i], synchronize=False) ) for fdr in self.fd_redirect: fdr.__enter__() # We have an issue where we are (very aggressively) # commandeering the terminal. This is what we intend, but the # side effect is that any errors generated by this module (e.g., # because the user gave us an invalid output stream) get # completely suppressed. So, we will make an exception to the # output that we are catching and let messages logged to THIS # logger to still be emitted. if self.capture_fd: # Because we are also comandeering the FD that underlies # self.old[1], we cannot just write to that stream and # instead open a new stream to the original FD. # # Note that we need to duplicate the FD from the redirector, # as it will close the (temporary) `original_fd` descriptor # when it restores the actual original descriptor self.temp_log_stream = os.fdopen( os.dup(self.fd_redirect[-1].original_fd), mode="w", closefd=True ) else: self.temp_log_stream = self.old[1] self.temp_log_handler = logging.StreamHandler(self.temp_log_stream) logger.addHandler(self.temp_log_handler) self._propagate = logger.propagate logger.propagate = False return self.output_stream def __exit__(self, et, ev, tb): # Restore any file descriptors we comandeered if self.fd_redirect is not None: for fdr in reversed(self.fd_redirect): fdr.__exit__(et, ev, tb) self.fd_redirect = None # Check and restore sys.stderr / sys.stdout FAIL = self.tee.STDOUT is not sys.stdout self.tee.__exit__(et, ev, tb) if self.output_stream is not self.output: self.output_stream.close() sys.stdout, sys.stderr = self.old self.old = None self.tee = None self.output_stream = None # Clean up our temporary override of the local logger self.temp_log_handler.flush() logger.removeHandler(self.temp_log_handler) if self.capture_fd: self.temp_log_stream.flush() self.temp_log_stream.close() logger.propagate = self._propagate self.temp_log_stream = None self.temp_log_handler = None if FAIL: raise RuntimeError('Captured output does not match sys.stdout.') def __del__(self): if self.tee is not None: self.__exit__(None, None, None) def setup(self): if self.old is not None: raise RuntimeError('Duplicate call to capture_output.setup.') return self.__enter__() def reset(self): return self.__exit__(None, None, None)
class _StreamHandle(object): def __init__(self, mode, buffering, encoding, newline): self.buffering = buffering self.newlines = newline self.flush = False self.read_pipe, self.write_pipe = os.pipe() if not buffering and 'b' not in mode: # While we support "unbuffered" behavior in text mode, # python does not buffering = -1 self.write_file = os.fdopen( self.write_pipe, mode=mode, buffering=buffering, encoding=encoding, newline=newline, closefd=False, ) if not self.buffering and buffering: # We want this stream to be unbuffered, but Python doesn't # allow it for text streams. Mock up an unbuffered stream # using AutoFlush self.write_file = _AutoFlush(self.write_file, self) else: self.write_file = _SignalFlush(self.write_file, self) self.decoder_buffer = b'' try: self.encoding = encoding or self.write_file.encoding except AttributeError: self.encoding = None if self.encoding: self.output_buffer = '' else: self.output_buffer = b'' def __repr__(self): return "%s(%s)" % (self.buffering, id(self)) def fileno(self): return self.read_pipe def close(self): # Close both the file and the underlying file descriptor. Note # that this may get called more than once. if self.write_file is not None: self.write_file.close() self.write_file = None if self.write_pipe is not None: # If someone else has closed the file descriptor, then # python raises an OSError try: os.close(self.write_pipe) except OSError: pass self.write_pipe = None def finalize(self, ostreams): self.decodeIncomingBuffer() if ostreams: self.writeOutputBuffer(ostreams, True) os.close(self.read_pipe) if self.output_buffer: logger.error( "Stream handle closed with a partial line " "in the output buffer that was not emitted to the " "output stream(s):\n\t'%s'" % (self.output_buffer,) ) if self.decoder_buffer: logger.error( "Stream handle closed with un-decoded characters " "in the decoder buffer that was not emitted to the " "output stream(s):\n\t%r" % (self.decoder_buffer,) ) def decodeIncomingBuffer(self): if not self.encoding: self.output_buffer, self.decoder_buffer = self.decoder_buffer, b'' return raw_len = len(self.decoder_buffer) chars = '' while raw_len: try: chars = self.decoder_buffer[:raw_len].decode(self.encoding) break except: pass # partial read of unicode character, try again with # a shorter bytes buffer raw_len -= 1 if self.newlines is None: chars = chars.replace('\r\n', '\n').replace('\r', '\n') self.output_buffer += chars self.decoder_buffer = self.decoder_buffer[raw_len:] def writeOutputBuffer(self, ostreams, flush): if not self.encoding: ostring, self.output_buffer = self.output_buffer, b'' elif self.buffering > 0 and not flush: EOL = self.output_buffer.rfind(self.newlines or '\n') + 1 ostring = self.output_buffer[:EOL] self.output_buffer = self.output_buffer[EOL:] else: ostring, self.output_buffer = self.output_buffer, '' if not ostring: return for stream in ostreams: try: written = stream.write(ostring) except: my_repr = "<%s.%s @ %s>" % ( stream.__class__.__module__, stream.__class__.__name__, hex(id(stream)), ) if my_repr in ostring: # In the case of nested capture_outputs, all the # handlers are left on the logger. We want to make # sure that we don't create an infinite loop by # re-printing a message *this* object generated. continue et, e, tb = sys.exc_info() msg = "Error writing to output stream %s:\n %s: %s\n" % ( my_repr, et.__name__, e, ) if getattr(stream, 'closed', False): msg += "Output stream closed before all output was written to it." else: msg += "Is this a writeable TextIOBase object?" logger.error( f"{msg}\nThe following was left in the output buffer:\n" f" {ostring!r}" ) continue if flush or (written and not self.buffering): stream.flush() # Note: some derived file-like objects fail to return the # number of characters written (and implicitly return None). # If we get None, we will just assume that everything was # fine (as opposed to tossing the incomplete write error). if written is not None and written != len(ostring): my_repr = "<%s.%s @ %s>" % ( stream.__class__.__module__, stream.__class__.__name__, hex(id(stream)), ) if my_repr in ostring[written:]: continue logger.error( "Incomplete write to output stream %s.\nThe following was " "left in the output buffer:\n %r" % (my_repr, ostring[written:]) )
[docs] class TeeStream(object):
[docs] def __init__(self, *ostreams, encoding=None, buffering=-1): self.ostreams = ostreams self.encoding = encoding self.buffering = buffering self._stdout = None self._stderr = None self._handles = [] self._active_handles = [] self._threads = []
@property def STDOUT(self): if self._stdout is None: b = self.buffering if b == -1: b = 1 self._stdout = self.open(buffering=b) return self._stdout @property def STDERR(self): if self._stderr is None: b = self.buffering if b == -1: b = 0 self._stderr = self.open(buffering=b) return self._stderr def open(self, mode='w', buffering=-1, encoding=None, newline=None): if encoding is None: encoding = self.encoding handle = _StreamHandle(mode, buffering, encoding, newline) # Note that is it VERY important to close file handles in the # same thread that opens it. If you don't you can see deadlocks # and a peculiar error ("libgcc_s.so.1 must be installed for # pthread_cancel to work"; see https://bugs.python.org/issue18748) # # To accomplish this, we will keep two handle lists: one is the # set of "active" handles that the (merged reader) thread is # using, and the other the list of all handles so the original # thread can close them after the reader thread terminates. if handle.buffering: self._active_handles.append(handle) else: # Unbuffered handles should appear earlier in the list so # that they get processed first self._active_handles.insert(0, handle) self._handles.append(handle) self._start(handle) return handle.write_file def close(self, in_exception=False): # Close all open handles. Note that as the threads may # immediately start removing handles from the list, it is # important that we iterate over a copy of the list. for h in list(self._handles): h.close() # Join all stream processing threads _poll = _poll_interval while True: for th in self._threads: th.join(_poll) self._threads[:] = [th for th in self._threads if th.is_alive()] if not self._threads: break _poll *= 2 if _poll_timeout <= _poll < 2 * _poll_timeout: if in_exception: # We are already processing an exception: no reason # to trigger another, nor to deadlock for an extended time break logger.warning( "Significant delay observed waiting to join reader " "threads, possible output stream deadlock" ) elif _poll >= _poll_timeout_deadlock: raise RuntimeError( "TeeStream: deadlock observed joining reader threads" ) for h in list(self._handles): h.finalize(self.ostreams) self._threads.clear() self._handles.clear() self._active_handles.clear() self._stdout = None self._stderr = None def __enter__(self): return self def __exit__(self, et, ev, tb): self.close(et is not None) def __del__(self): # Implement __del__ to guarantee that file descriptors are closed # ... but only if we are not called by the GC in the handler thread if threading.current_thread() not in self._threads: self.close() def _start(self, handle): if not _peek_available: th = threading.Thread(target=self._streamReader, args=(handle,)) th.daemon = True th.start() self._threads.append(th) elif not self._threads: th = threading.Thread(target=self._mergedReader) th.daemon = True th.start() self._threads.append(th) else: # The merged reader is already running... nothing additional # needs to be done pass def _streamReader(self, handle): while True: new_data = os.read(handle.read_pipe, io.DEFAULT_BUFFER_SIZE) if handle.flush: flush = True handle.flush = False else: flush = False if new_data: handle.decoder_buffer += new_data elif not flush: break # At this point, we have new data sitting in the # handle.decoder_buffer handle.decodeIncomingBuffer() # Now, output whatever we have decoded to the output streams handle.writeOutputBuffer(self.ostreams, flush) # # print("STREAM READER: DONE") def _mergedReader(self): noop = [] handles = self._active_handles _poll = _poll_interval _fast_poll_ct = _poll_rampup new_data = '' # something not None while handles: flush = False if new_data is None: # For performance reasons, we use very aggressive # polling at the beginning (_poll_interval) and then # ramp up to a much more modest polling interval # (_poll_rampup_limit) as the process runs and the # frequency of new data appearing on the pipe slows if _fast_poll_ct: _fast_poll_ct -= 1 if not _fast_poll_ct: _poll *= 10 if _poll < _poll_rampup_limit: # reset the counter (to potentially increase # the polling interval again) _fast_poll_ct = _poll_rampup else: new_data = None if _mswindows: for handle in list(handles): try: if handle.flush: flush = True handle.flush = False pipe = get_osfhandle(handle.read_pipe) numAvail = PeekNamedPipe(pipe, 0)[1] if numAvail: result, new_data = ReadFile(pipe, numAvail, None) handle.decoder_buffer += new_data break except: handles.remove(handle) new_data = '' # not None so the poll interval doesn't increase if new_data is None and not flush: # PeekNamedPipe is non-blocking; to avoid swamping # the core, sleep for a "short" amount of time time.sleep(_poll) continue else: # Because we could be *adding* handles to the TeeStream # while the _mergedReader is running, we want to # periodically time out and update the list of handles # that we are waiting for. It is also critical that we # send select() a *copy* of the handles list, as we see # deadlocks when handles are added while select() is # waiting ready_handles = select(list(handles), noop, noop, _poll)[0] if ready_handles: handle = ready_handles[0] new_data = os.read(handle.read_pipe, io.DEFAULT_BUFFER_SIZE) if new_data: handle.decoder_buffer += new_data else: handles.remove(handle) new_data = '' # not None so the poll interval doesn't increase else: for handle in handles: if handle.flush: new_data = '' break else: new_data = None continue if handle.flush: flush = True handle.flush = False # At this point, we have new data sitting in the # handle.decoder_buffer handle.decodeIncomingBuffer() # Now, output whatever we have decoded to the output streams handle.writeOutputBuffer(self.ostreams, flush)
# # print("MERGED READER: DONE")