Source code for pyomo.common.tee

#  ___________________________________________________________________________
#
#  Pyomo: Python Optimization Modeling Objects
#  Copyright (c) 2008-2025
#  National Technology and Engineering Solutions of Sandia, LLC
#  Under the terms of Contract DE-NA0003525 with National Technology and
#  Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
#  rights in this software.
#  This software is distributed under the 3-clause BSD License.
#  ___________________________________________________________________________
#
#  This module was originally developed as part of the PyUtilib project
#  Copyright (c) 2008 Sandia Corporation.
#  This software is distributed under the BSD License.
#  Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
#  the U.S. Government retains certain rights in this software.
#  ___________________________________________________________________________
#
import collections.abc
import io
import logging
import os
import sys
import threading
import time

from pyomo.common.errors import DeveloperError
from pyomo.common.log import LoggingIntercept, LogStream

_poll_interval = 0.0001
_poll_rampup_limit = 0.099
# reader polling: number of timeouts with no data before increasing the
# polling interval
_poll_rampup = 10
# polling timeout when waiting to close threads.  This will bail on
# closing threast after a minimum of 13.1 seconds and a worst case of
# ~(13.1 * #threads) seconds
_poll_timeout = 1  # 14 rounds: 0.0001 * 2**14 == 1.6384
_poll_timeout_deadlock = 100  # seconds

_noop = lambda: None
_mswindows = sys.platform.startswith('win')
try:
    if _mswindows:
        from msvcrt import get_osfhandle
        from win32pipe import PeekNamedPipe
        from win32file import ReadFile
    else:
        from select import select
    _peek_available = True
except ImportError:
    _peek_available = False

logger = logging.getLogger(__name__)


class _SignalFlush(object):
    def __init__(self, ostream, handle):
        super().__setattr__('_ostream', ostream)
        super().__setattr__('_handle', handle)

    def flush(self):
        self._ostream.flush()
        self._handle.flush = True

    def __getattr__(self, attr):
        return getattr(self._ostream, attr)

    def __setattr__(self, attr, val):
        return setattr(self._ostream, attr, val)


class _AutoFlush(_SignalFlush):
    def write(self, data):
        self._ostream.write(data)
        self.flush()

    def writelines(self, data):
        self._ostream.writelines(data)
        self.flush()


class _fd_closer(object):
    """A context manager to handle closing a specified file descriptor

    Ideally we would use `os.fdopen(... closefd=True)`; however, it
    appears that Python ignores `closefd` on Windows.  This would
    eventually lead to the process exceeding the maximum number of open
    files (see Pyomo/pyomo#3587).  So, we will explicitly manage closing
    the file descriptors that we open using this context manager.

    """

    def __init__(self, fd):
        self.fd = fd

    def __enter__(self):
        return self.fd

    def __exit__(self, et, ev, tb):
        os.close(self.fd)


[docs] class redirect_fd(object): """Redirect a file descriptor to a new file or file descriptor. This context manager will redirect the specified file descriptor to a specified new output target (either file name or file descriptor). For the special case of file descriptors 1 (stdout) and 2 (stderr), we will also make sure that the Python `sys.stdout` or `sys.stderr` remain usable: in the case of synchronize=True, the `sys.stdout` / `sys.stderr` file handles point to the new file descriptor. When synchronize=False, we preserve the behavior of the Python file object (retargeting it to the original file descriptor if necessary). Parameters ---------- fd: int The file descriptor to redirect output: int or str or None The new output target for `fd`: either another valid file descriptor (int) or a string with the file to open. If `None`, then the fd is redirected to `os.devnull`. synchronize: bool If True, and `fd` is 1 or 2, then update `sys.stdout` or `sys.stderr` to also point to the new file descriptor """
[docs] def __init__(self, fd=1, output=None, synchronize=True): if output is None: # /dev/null is used just to discard what is being printed output = os.devnull self.fd = fd self.std = {1: 'stdout', 2: 'stderr'}.get(self.fd, None) self.target = output self.target_file = None self.synchronize = synchronize self.original_file = None self.original_fd = None
def __enter__(self): if self.std: # We used to flush original_file here. We have removed that # because the std* streams are flushed by capture_output. # Flushing again here caused intermittent errors due to # closed file handles on OSX self.original_file = getattr(sys, self.std) # Duplicate the original standard file descriptor(file # descriptor 1 or 2) to a different file descriptor number self.original_fd = os.dup(self.fd) # Open a file descriptor pointing to the new file if isinstance(self.target, int): out_fd = self.target else: out_fd = os.open(self.target, os.O_WRONLY) # Duplicate the file descriptor for the opened file, closing and # overwriting/replacing the original fd. Only make the new FD # inheritable if it is stdout/stderr os.dup2(out_fd, self.fd, inheritable=bool(self.std)) # We no longer need this original file descriptor if not isinstance(self.target, int): os.close(out_fd) if self.synchronize and self.std: # Cause Python's stdout to point to our new file self.target_file = os.fdopen(self.fd, 'w', closefd=False) setattr(sys, self.std, self.target_file) return self def __exit__(self, et, ev, tb): # Close output: this either closes the new file that we opened, # or else the new file that points to the original (duplicated) # file descriptor if self.target_file is not None: self.target_file.flush() self.target_file.close() self.target_file = None setattr(sys, self.std, self.original_file) # Restore stdout's FD (implicitly closing the FD we opened) os.dup2(self.original_fd, self.fd, inheritable=bool(self.std)) # Close the temporary FD os.close(self.original_fd)
[docs] class capture_output(object): """Context manager to capture output sent to sys.stdout and sys.stderr This is a drop-in substitute for PyUtilib's capture_output to temporarily redirect output to the provided stream or file. Parameters ---------- output : io.TextIOBase, Sequence[io.TextIOBase], TeeStream, str, or None Output stream where all captured stdout/stderr data is sent. If a ``str`` is provided, it is used as a file name and opened (potentially overwriting any existing file). If ``None``, a :class:`io.StringIO` object is created and used. capture_fd : bool If True, we will also redirect the process file descriptors ``1`` (stdout), ``2`` (stderr), and the file descriptors from ``sys.stdout.fileno()`` and ``sys.stderr.fileno()`` to the ``output``. This is useful for capturing output emitted directly to the process stdout / stderr by external compiled modules. Capturing and redirecting the file descriptors can cause loops in the output stream (where one of the `output` streams points to a file descriptor we just captured). :py:class:`capture_output` will attempt to locate :py:class:`io.IOBase` streams in `output` that point to file descriptors that we just captured and replace them with temporary streams that point to (copies of) the original file descriptor. In addition, :py:class:`capture_output` will look for :py:class:`~pyomo.common.log.LogStream` objects and will attempt to locate :py:class:`logging.StreamHandle` objects that would output to a redirected file descriptor and temporarily redirect those handlers to (copies of) the original file descriptor. Note that this process will cover the most common cases, but is by no means perfect. Use of other output classes or customized log handlers may still result in output loops (usually manifesting in an error message about text being left in the output buffer). Returns ------- io.TextIOBase This is the output stream object where all data is sent. """ startup_shutdown = threading.Lock()
[docs] def __init__(self, output=None, capture_fd=False): self.output = output self.output_stream = None self.old = None self.tee = None self.capture_fd = capture_fd self.context_stack = []
def _enter_context(self, cm, prior_to=None): """Add the context manager to the context stack and return the result from calling the context manager's `__enter__()` """ if prior_to is None: self.context_stack.append(cm) else: self.context_stack.insert(self.context_stack.index(prior_to), cm) return cm.__enter__() def _exit_context_stack(self, et, ev, tb): """Flush the context stack, calling __exit__() on all context managers One would usually use the contextlib.ExitStack to implement/manage the collection of context managers we are putting together. The problem is that ExitStack will only call the __exit__ handlers up to the first one that returns an exception. As we are expecting the possibility of one of the CMs here to raise an exception (usually from TeeStream when joining the reader threads), we will explicitly implement the stack management here so that we will guarantee that all __exit__ handlers will always be called. """ FAIL = [] while self.context_stack: try: cm = self.context_stack.pop() cm.__exit__(et, ev, tb) except: _stack = self.context_stack FAIL.append(f"{sys.exc_info()[1]} ({len(_stack)+1}: {cm}@{id(cm):x})") return FAIL def __enter__(self): if not capture_output.startup_shutdown.acquire(timeout=_poll_timeout_deadlock): # This situation *shouldn't* happen. If it does, it is # unlikely that the user can fix it (or even debug it). # Instead they should report it back to us. # # Breadcrumbs: # # - The last time we hit this [5/2025], it was because we # were using capture_output in a solver's __del__. This # led to the GC deleting the solver while another solver # was trying to start up / run (so the other solver held # the lock, but the GC interrupted that thread and # wouldn't let go). raise DeveloperError("Deadlock starting capture_output") try: return self._enter_impl() finally: capture_output.startup_shutdown.release() def __exit__(self, et, ev, tb): if not capture_output.startup_shutdown.acquire(timeout=_poll_timeout_deadlock): # See comments & breadcrumbs in __enter__() above. raise DeveloperError("Deadlock closing capture_output") try: return self._exit_impl(et, ev, tb) finally: capture_output.startup_shutdown.release() def _enter_impl(self): self.old = (sys.stdout, sys.stderr) old_fd = [] for stream in self.old: try: stream.flush() try: old_fd.append(stream.fileno()) except (AttributeError, OSError): old_fd.append(None) except (ValueError, OSError): old_fd.append(None) try: # We have an issue where we are (very aggressively) # commandeering the terminal. This is what we intend, but the # side effect is that any errors generated by this module (e.g., # because the user gave us an invalid output stream) get # completely suppressed. So, we will make an exception to the # output that we are catching and let messages logged to THIS # logger to still be emitted to the original stderr. if self.capture_fd: # Because we are also commandeering the FD that underlies # sys.stderr, we cannot just write to that stream and # instead will open a new stream to the "original" FD # (Note that we need to duplicate that FD, as we will # overwrite it when we get to redirect_fd below). If # sys.stderr doesn't have a file descriptor, we will # fall back on the process stderr (FD=2). # # Note that we would like to use closefd=True, but can't # (see _fd_closer docs) log_stream = self._enter_context( os.fdopen( self._enter_context(_fd_closer(os.dup(old_fd[1] or 2))), mode="w", closefd=False, ) ) else: log_stream = self.old[1] self._enter_context(LoggingIntercept(log_stream, logger=logger, level=None)) if isinstance(self.output, str): self.output_stream = self._enter_context(open(self.output, 'w')) elif self.output is None: self.output_stream = io.StringIO() else: self.output_stream = self.output if isinstance(self.output, TeeStream): self.tee = self._enter_context(self.output) elif isinstance(self.output_stream, collections.abc.Sequence): self.tee = self._enter_context(TeeStream(*self.output_stream)) else: self.tee = self._enter_context(TeeStream(self.output_stream)) fd_redirect = {} if self.capture_fd: tee_fd = (self.tee.STDOUT.fileno(), self.tee.STDERR.fileno()) for i in range(2): # Redirect the standard process file descriptor (1 or 2) fd_redirect[i + 1] = self._enter_context( redirect_fd(i + 1, tee_fd[i], synchronize=False) ) # Redirect the file descriptor currently associated with # sys.stdout / sys.stderr fd = old_fd[i] if fd and fd not in fd_redirect: fd_redirect[fd] = self._enter_context( redirect_fd(fd, tee_fd[i], synchronize=False) ) # We need to make sure that we didn't just capture the FD # that underlies a stream that we are outputting to. Note # that when capture_fd==False, normal streams will be left # alone, but the lastResort _StderrHandler() will still be # replaced (needed because that handler uses the *current* # value of sys.stderr) ostreams = [] for stream in self.tee.ostreams: if isinstance(stream, LogStream): for handler_redirect in stream.redirect_streams(fd_redirect): self._enter_context(handler_redirect, prior_to=self.tee) else: try: fd = stream.fileno() except (AttributeError, OSError): fd = None if fd in fd_redirect: # We just redirected this file descriptor so # we can capture the output. This makes a # loop that we really want to break. Undo # the redirect by pointing our output stream # back to the original file descriptor. # # Note that we would like to use closefd=True, but can't # (see _fd_closer docs) stream = self._enter_context( os.fdopen( self._enter_context( _fd_closer(os.dup(fd_redirect[fd].original_fd)), prior_to=self.tee, ), mode="w", closefd=False, ), prior_to=self.tee, ) ostreams.append(stream) self.tee.ostreams = ostreams except: # Note: we will ignore any exceptions raised while exiting # the context managers and just reraise the original # exception. self._exit_context_stack(*sys.exc_info()) raise sys.stdout = self.tee.STDOUT sys.stderr = self.tee.STDERR buf = self.tee.ostreams if len(buf) == 1: buf = buf[0] return buf def _exit_impl(self, et, ev, tb): # Check that we were nested correctly FAIL = [] if self.tee is not None: if self.tee._stdout is not None and self.tee.STDOUT is not sys.stdout: FAIL.append( 'Captured output (%s) does not match sys.stdout (%s).' % (self.tee._stdout, sys.stdout) ) if self.tee._stderr is not None and self.tee.STDERR is not sys.stderr: FAIL.append( 'Captured output (%s) does not match sys.stderr (%s).' % (self.tee._stdout, sys.stdout) ) # Exit all context managers. This includes # - Restore any file descriptors we commandeered # - Close / join the TeeStream # - Close any opened files FAIL.extend(self._exit_context_stack(et, ev, tb)) if self.old is not None: sys.stdout, sys.stderr = self.old self.old = None self.tee = None self.output_stream = None if FAIL: raise RuntimeError("\n".join(FAIL)) def __del__(self): if self.tee is not None: self.__exit__(None, None, None) def setup(self): if self.old is not None: raise RuntimeError('Duplicate call to capture_output.setup.') return self.__enter__() def reset(self): return self.__exit__(None, None, None)
class _StreamHandle(object): """A stream handler object used by TeeStream This handler holds the two sides of the pipe used to communicate output generated by the main thread out to the handler thread (which passes the output on to the TeeStream's output streams). Note that this class is intimately tied to TeeStream and relies on that class for certain termination / cleanup events (including flushing and closing buffers) """ def __init__(self, mode, buffering, encoding, newline): self.buffering = buffering self.newlines = newline self.flush = False self.read_pipe, self.write_pipe = os.pipe() if not buffering and 'b' not in mode: # While we support "unbuffered" behavior in text mode, # python does not buffering = -1 self.write_file = os.fdopen( self.write_pipe, mode=mode, buffering=buffering, encoding=encoding, newline=newline, closefd=False, ) if not self.buffering and buffering: # We want this stream to be unbuffered, but Python doesn't # allow it for text streams. Mock up an unbuffered stream # using AutoFlush self.write_file = _AutoFlush(self.write_file, self) else: self.write_file = _SignalFlush(self.write_file, self) self.decoder_buffer = b'' try: self.encoding = encoding or self.write_file.encoding except AttributeError: self.encoding = None if self.encoding: self.output_buffer = '' else: self.output_buffer = b'' def fileno(self): return self.read_pipe def close(self): # Close both the file and the underlying file descriptor. Note # that this may get called more than once. if self.write_file is not None: if not self.write_file.closed: self.write_file.flush() self.write_file.close() self.write_file = None if self.write_pipe is not None: # If someone else has closed the file descriptor, then # python raises an OSError try: os.close(self.write_pipe) except OSError: pass self.write_pipe = None def finalize(self, ostreams): # Note that this expects to be called by TeeStream *after* # TeeStream has called close(), so the output_buffer should have # been flushed and emptied. self.decodeIncomingBuffer() if ostreams: self.writeOutputBuffer(ostreams, True) os.close(self.read_pipe) if self.decoder_buffer: logger.error( "Stream handle closed with un-decoded characters " "in the decoder buffer that was not emitted to the " "output stream(s):\n\t%r" % (self.decoder_buffer,) ) def decodeIncomingBuffer(self): if not self.encoding: self.output_buffer, self.decoder_buffer = self.decoder_buffer, b'' return raw_len = len(self.decoder_buffer) chars = '' while raw_len: try: chars = self.decoder_buffer[:raw_len].decode(self.encoding) break except: pass # partial read of unicode character, try again with # a shorter bytes buffer raw_len -= 1 if self.newlines is None: chars = chars.replace('\r\n', '\n').replace('\r', '\n') self.output_buffer += chars self.decoder_buffer = self.decoder_buffer[raw_len:] def writeOutputBuffer(self, ostreams, flush): if not self.encoding: ostring, self.output_buffer = self.output_buffer, b'' elif self.buffering > 0 and not flush: EOL = self.output_buffer.rfind(self.newlines or '\n') + 1 ostring = self.output_buffer[:EOL] self.output_buffer = self.output_buffer[EOL:] else: ostring, self.output_buffer = self.output_buffer, '' if not ostring: return for stream in ostreams: try: written = stream.write(ostring) except: my_repr = "<%s.%s @ %s>" % ( stream.__class__.__module__, stream.__class__.__name__, hex(id(stream)), ) if my_repr in ostring: # In the case of nested capture_outputs, all the # handlers are left on the logger. We want to make # sure that we don't create an infinite loop by # re-printing a message *this* object generated. continue et, e, tb = sys.exc_info() msg = "Error writing to output stream %s:\n %s: %s\n" % ( my_repr, et.__name__, e, ) if getattr(stream, 'closed', False): msg += "Output stream closed before all output was written to it." else: msg += "Is this a writeable TextIOBase object?" logger.error( f"{msg}\nThe following was left in the output buffer:\n" f" {ostring!r}" ) continue if flush or (written and not self.buffering): getattr(stream, 'flush', _noop)() # Note: some derived file-like objects fail to return the # number of characters written (and implicitly return None). # If we get None, we will just assume that everything was # fine (as opposed to tossing the incomplete write error). if written is not None and written != len(ostring): my_repr = "<%s.%s @ %s>" % ( stream.__class__.__module__, stream.__class__.__name__, hex(id(stream)), ) if my_repr in ostring[written:]: continue logger.error( "Incomplete write to output stream %s.\nThe following was " "left in the output buffer:\n %r" % (my_repr, ostring[written:]) )
[docs] class TeeStream(object):
[docs] def __init__(self, *ostreams, encoding=None, buffering=-1): self.ostreams = ostreams self.encoding = encoding self.buffering = buffering self._stdout = None self._stderr = None self._handles = [] self._active_handles = [] self._threads = [] self._enter_count = 0
@property def STDOUT(self): if self._stdout is None: b = self.buffering if b == -1: b = 1 self._stdout = self.open(buffering=b) return self._stdout @property def STDERR(self): if self._stderr is None: b = self.buffering if b == -1: b = 0 self._stderr = self.open(buffering=b) return self._stderr def open(self, mode='w', buffering=-1, encoding=None, newline=None): if encoding is None: encoding = self.encoding handle = _StreamHandle(mode, buffering, encoding, newline) # Note that is it VERY important to close file handles in the # same thread that opens it. If you don't you can see deadlocks # and a peculiar error ("libgcc_s.so.1 must be installed for # pthread_cancel to work"; see https://bugs.python.org/issue18748) # # To accomplish this, we will keep two handle lists: one is the # set of "active" handles that the (merged reader) thread is # using, and the other the list of all handles so the original # thread can close them after the reader thread terminates. if handle.buffering: self._active_handles.append(handle) else: # Unbuffered handles should appear earlier in the list so # that they get processed first self._active_handles.insert(0, handle) self._handles.append(handle) self._start(handle) return handle.write_file def close(self, in_exception=False): # Close all open handles. Note that as the threads may # immediately start removing handles from the list, it is # important that we iterate over a copy of the list. for h in list(self._handles): h.close() # Join all stream processing threads _poll = _poll_interval _timeout = 0.0 FAIL = False while True: for th in self._threads: th.join(_poll) _timeout += _poll self._threads[:] = [th for th in self._threads if th.is_alive()] if not self._threads: break if _poll < _poll_timeout: _poll *= 2.0 if _poll_timeout * 0.5 <= _poll < _poll_timeout: if in_exception: # We are already processing an exception: no reason # to trigger another, nor to deadlock for an # extended time. Silently clean everything up # (because emitting logger messages could trigger # yet another exception and mask the true cause). break logger.warning( "Significant delay observed waiting to join reader " "threads, possible output stream deadlock" ) elif _timeout > _poll_timeout_deadlock: logger.error("TeeStream: deadlock observed joining reader threads") # Defer raising the exception until after we have # cleaned things up FAIL = True break for h in list(self._handles): h.finalize(self.ostreams) self._threads.clear() self._handles.clear() self._active_handles.clear() self._stdout = None self._stderr = None if FAIL: raise RuntimeError("TeeStream: deadlock observed joining reader threads") def __enter__(self): self._enter_count += 1 return self def __exit__(self, et, ev, tb): if not self._enter_count: raise RuntimeError("TeeStream: exiting a context that was not entered") self._enter_count -= 1 if not self._enter_count: self.close(et is not None) def __del__(self): # Implement __del__ to guarantee that file descriptors are closed # ... but only if we are not called by the GC in a handler thread if threading.current_thread() not in self._threads: self.close() def _start(self, handle): if not _peek_available: th = threading.Thread(target=self._streamReader, args=(handle,)) th.daemon = True th.start() self._threads.append(th) elif not self._threads: th = threading.Thread(target=self._mergedReader) th.daemon = True th.start() self._threads.append(th) else: # The merged reader is already running... nothing additional # needs to be done pass def _streamReader(self, handle): while True: new_data = os.read(handle.read_pipe, io.DEFAULT_BUFFER_SIZE) if handle.flush: flush = True handle.flush = False else: flush = False if new_data: handle.decoder_buffer += new_data elif not flush: break # At this point, we have new data sitting in the # handle.decoder_buffer handle.decodeIncomingBuffer() # Now, output whatever we have decoded to the output streams handle.writeOutputBuffer(self.ostreams, flush) # # print("STREAM READER: DONE") def _mergedReader(self): noop = [] handles = self._active_handles _poll = _poll_interval _fast_poll_ct = _poll_rampup new_data = '' # something not None while handles: flush = False if new_data is None: # For performance reasons, we use very aggressive # polling at the beginning (_poll_interval) and then # ramp up to a much more modest polling interval # (_poll_rampup_limit) as the process runs and the # frequency of new data appearing on the pipe slows if _fast_poll_ct: _fast_poll_ct -= 1 if not _fast_poll_ct: _poll *= 10 if _poll < _poll_rampup_limit: # reset the counter (to potentially increase # the polling interval again) _fast_poll_ct = _poll_rampup else: new_data = None if _mswindows: for handle in list(handles): try: pipe = get_osfhandle(handle.read_pipe) numAvail = PeekNamedPipe(pipe, 0)[1] if numAvail: result, new_data = ReadFile(pipe, numAvail, None) handle.decoder_buffer += new_data break elif handle.flush: break except: handles.remove(handle) new_data = '' # not None so the poll interval doesn't increase if new_data is None and not handle.flush: # PeekNamedPipe is non-blocking; to avoid swamping # the core, sleep for a "short" amount of time time.sleep(_poll) continue else: # Because we could be *adding* handles to the TeeStream # while the _mergedReader is running, we want to # periodically time out and update the list of handles # that we are waiting for. It is also critical that we # send select() a *copy* of the handles list, as we see # deadlocks when handles are added while select() is # waiting ready_handles = select(list(handles), noop, noop, _poll)[0] if ready_handles: handle = ready_handles[0] new_data = os.read(handle.read_pipe, io.DEFAULT_BUFFER_SIZE) if new_data: handle.decoder_buffer += new_data else: handles.remove(handle) new_data = '' # not None so the poll interval doesn't increase else: for handle in handles: if handle.flush: break else: continue if handle.flush: new_data = '' flush = True handle.flush = False # At this point, we have new data sitting in the # handle.decoder_buffer handle.decodeIncomingBuffer() # Now, output whatever we have decoded to the output streams handle.writeOutputBuffer(self.ostreams, flush)
# # print("MERGED READER: DONE")