Source code for pyomo.common.tee

#  ___________________________________________________________________________
#
#  Pyomo: Python Optimization Modeling Objects
#  Copyright (c) 2008-2024
#  National Technology and Engineering Solutions of Sandia, LLC
#  Under the terms of Contract DE-NA0003525 with National Technology and
#  Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
#  rights in this software.
#  This software is distributed under the 3-clause BSD License.
#  ___________________________________________________________________________
#
#  This module was originally developed as part of the PyUtilib project
#  Copyright (c) 2008 Sandia Corporation.
#  This software is distributed under the BSD License.
#  Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
#  the U.S. Government retains certain rights in this software.
#  ___________________________________________________________________________
#
import io
import logging
import os
import sys
import threading
import time
from io import StringIO

_poll_interval = 0.0001
_poll_rampup_limit = 0.099
# reader polling: number of timeouts with no data before increasing the
# polling interval
_poll_rampup = 10
# polling timeout when waiting to close threads.  This will bail on
# closing threast after a minimum of 13.1 seconds and a worst case of
# ~(13.1 * #threads) seconds
_poll_timeout = 3  # 15 rounds: 0.0001 * 2**15 == 3.2768
_poll_timeout_deadlock = 200  # 21 rounds: 0.0001 * 2**21 == 209.7152

_mswindows = sys.platform.startswith('win')
try:
    if _mswindows:
        from msvcrt import get_osfhandle
        from win32pipe import PeekNamedPipe
        from win32file import ReadFile
    else:
        from select import select
    _peek_available = True
except ImportError:
    _peek_available = False

logger = logging.getLogger(__name__)


[docs] class redirect_fd(object): """Redirect a file descriptor to a new file or file descriptor. This context manager will redirect the specified file descriptor to a specified new output target (either file name or file descriptor). For the special case of file descriptors 1 (stdout) and 2 (stderr), we will also make sure that the Python `sys.stdout` or `sys.stderr` remain usable: in the case of synchronize=True, the `sys.stdout` / `sys.stderr` file handles point to the new file descriptor. When synchronize=False, we preserve the behavior of the Python file object (retargeting it to the original file descriptor if necessary). Parameters ---------- fd: int The file descriptor to redirect output: int or str The new output target for `fd`: either another valid file descriptor (int) or a string with the file to open. synchronize: bool If True, and `fd` is 1 or 2, then update `sys.stdout` or `sys.stderr` to also point to the new file descriptor """
[docs] def __init__(self, fd=1, output=None, synchronize=True): if output is None: # /dev/null is used just to discard what is being printed output = os.devnull self.fd = fd self.std = {1: 'stdout', 2: 'stderr'}.get(self.fd, None) self.target = output self.target_file = None self.synchronize = synchronize self.original_file = None self.original_fd = None
def __enter__(self): if self.std: # important: flush the current file buffer when redirecting getattr(sys, self.std).flush() self.original_file = getattr(sys, self.std) # Duplicate the original standard file descriptor(file # descriptor 1 or 2) to a different file descriptor number self.original_fd = os.dup(self.fd) # Open a file descriptor pointing to the new file if isinstance(self.target, int): out_fd = self.target else: out_fd = os.open(self.target, os.O_WRONLY) # Duplicate the file descriptor for the opened file, closing and # overwriting the value for stdout (file descriptor 1). Only # make the new FD inheritable if it is stdout/stderr os.dup2(out_fd, self.fd, inheritable=bool(self.std)) # We no longer need this original file descriptor if out_fd is not self.target: os.close(out_fd) if self.std: if self.synchronize: # Cause Python's stdout to point to our new file fd = self.fd else: # IF we are not synchronizing the std file object with # the redirected file descriptor, and IF the current # file object is pointing to the original file # descriptor that we just redirected, then we want to # retarget the std file to the original (duplicated) # target file descriptor. This allows, e.g. Python to # still write to stdout when we redirect fd=1 to # /dev/null try: old_std_fd = getattr(sys, self.std).fileno() fd = self.original_fd if old_std_fd == self.fd else None except (io.UnsupportedOperation, AttributeError): fd = None if fd is not None: self.target_file = os.fdopen(fd, 'w', closefd=False) setattr(sys, self.std, self.target_file) return self def __exit__(self, t, v, traceback): # Close output: this either closes the new file that we opened, # or else the new file that points to the original (duplicated) # file descriptor if self.target_file is not None: self.target_file.flush() self.target_file.close() self.target_file = None setattr(sys, self.std, self.original_file) # Restore stdout's FD (implicitly closing the FD we opened) os.dup2(self.original_fd, self.fd, inheritable=bool(self.std)) # Close the temporary FD os.close(self.original_fd)
[docs] class capture_output(object): """ Drop-in substitute for PyUtilib's capture_output. Takes in a StringIO, file-like object, or filename and temporarily redirects output to a string buffer. """
[docs] def __init__(self, output=None, capture_fd=False): if output is None: output = StringIO() self.output = output self.output_file = None self.old = None self.tee = None self.capture_fd = capture_fd self.fd_redirect = None
def __enter__(self): if isinstance(self.output, str): self.output_stream = open(self.output, 'w') else: self.output_stream = self.output self.old = (sys.stdout, sys.stderr) self.tee = TeeStream(self.output_stream) self.tee.__enter__() sys.stdout = self.tee.STDOUT sys.stderr = self.tee.STDERR if self.capture_fd: self.fd_redirect = ( redirect_fd(1, sys.stdout.fileno()), redirect_fd(2, sys.stderr.fileno()), ) self.fd_redirect[0].__enter__() self.fd_redirect[1].__enter__() return self.output_stream def __exit__(self, et, ev, tb): if self.fd_redirect is not None: self.fd_redirect[1].__exit__(et, ev, tb) self.fd_redirect[0].__exit__(et, ev, tb) self.fd_redirect = None FAIL = self.tee.STDOUT is not sys.stdout self.tee.__exit__(et, ev, tb) if self.output_stream is not self.output: self.output_stream.close() sys.stdout, sys.stderr = self.old self.old = None self.tee = None self.output_stream = None if FAIL: raise RuntimeError('Captured output does not match sys.stdout.') def __del__(self): if self.tee is not None: self.__exit__(None, None, None) def setup(self): if self.old is not None: raise RuntimeError('Duplicate call to capture_output.setup.') return self.__enter__() def reset(self): return self.__exit__(None, None, None)
class _StreamHandle(object): def __init__(self, mode, buffering, encoding, newline): self.buffering = buffering self.newlines = newline self.read_pipe, self.write_pipe = os.pipe() if not buffering and 'b' not in mode: # While we support "unbuffered" behavior in text mode, # python does not buffering = -1 self.write_file = os.fdopen( self.write_pipe, mode=mode, buffering=buffering, encoding=encoding, newline=newline, closefd=False, ) self.decoder_buffer = b'' try: self.encoding = encoding or self.write_file.encoding except AttributeError: self.encoding = None if self.encoding: self.output_buffer = '' else: self.output_buffer = b'' def __repr__(self): return "%s(%s)" % (self.buffering, id(self)) def fileno(self): return self.read_pipe def close(self): # Close both the file and the underlying file descriptor. Note # that this may get called more than once. if self.write_file is not None: self.write_file.close() self.write_file = None if self.write_pipe is not None: # If someone else has closed the file descriptor, then # python raises an OSError try: os.close(self.write_pipe) except OSError: pass self.write_pipe = None def finalize(self, ostreams): self.decodeIncomingBuffer() if ostreams: # Turn off buffering for the final write self.buffering = 0 self.writeOutputBuffer(ostreams) os.close(self.read_pipe) if self.output_buffer: logger.error( "Stream handle closed with a partial line " "in the output buffer that was not emitted to the " "output stream(s):\n\t'%s'" % (self.output_buffer,) ) if self.decoder_buffer: logger.error( "Stream handle closed with un-decoded characters " "in the decoder buffer that was not emitted to the " "output stream(s):\n\t%r" % (self.decoder_buffer,) ) def decodeIncomingBuffer(self): if not self.encoding: self.output_buffer, self.decoder_buffer = self.decoder_buffer, b'' return raw_len = len(self.decoder_buffer) chars = '' while raw_len: try: chars = self.decoder_buffer[:raw_len].decode(self.encoding) break except: pass # partial read of unicode character, try again with # a shorter bytes buffer raw_len -= 1 if self.newlines is None: chars = chars.replace('\r\n', '\n').replace('\r', '\n') self.output_buffer += chars self.decoder_buffer = self.decoder_buffer[raw_len:] def writeOutputBuffer(self, ostreams): if not self.encoding: ostring, self.output_buffer = self.output_buffer, b'' elif self.buffering == 1: EOL = self.output_buffer.rfind(self.newlines or '\n') + 1 ostring = self.output_buffer[:EOL] self.output_buffer = self.output_buffer[EOL:] else: ostring, self.output_buffer = self.output_buffer, '' if not ostring: return for stream in ostreams: try: written = stream.write(ostring) except: written = 0 if written and not self.buffering: stream.flush() # Note: some derived file-like objects fail to return the # number of characters written (and implicitly return None). # If we get None, we will just assume that everything was # fine (as opposed to tossing the incomplete write error). if written is not None and written != len(ostring): logger.error( "Output stream (%s) closed before all output was " "written to it. The following was left in " "the output buffer:\n\t%r" % (stream, ostring[written:]) )
[docs] class TeeStream(object):
[docs] def __init__(self, *ostreams, encoding=None): self.ostreams = ostreams self.encoding = encoding self._stdout = None self._stderr = None self._handles = [] self._active_handles = [] self._threads = []
@property def STDOUT(self): if self._stdout is None: self._stdout = self.open(buffering=1) return self._stdout @property def STDERR(self): if self._stderr is None: self._stderr = self.open(buffering=0) return self._stderr def open(self, mode='w', buffering=-1, encoding=None, newline=None): if encoding is None: encoding = self.encoding handle = _StreamHandle(mode, buffering, encoding, newline) # Note that is it VERY important to close file handles in the # same thread that opens it. If you don't you can see deadlocks # and a peculiar error ("libgcc_s.so.1 must be installed for # pthread_cancel to work"; see https://bugs.python.org/issue18748) # # To accomplish this, we will keep two handle lists: one is the # set of "active" handles that the (merged reader) thread is # using, and the other the list of all handles so the original # thread can close them after the reader thread terminates. if handle.buffering: self._active_handles.append(handle) else: # Unbuffered handles should appear earlier in the list so # that they get processed first self._active_handles.insert(0, handle) self._handles.append(handle) self._start(handle) return handle.write_file def close(self, in_exception=False): # Close all open handles. Note that as the threads may # immediately start removing handles from the list, it is # important that we iterate over a copy of the list. for h in list(self._handles): h.close() # Join all stream processing threads _poll = _poll_interval while True: for th in self._threads: th.join(_poll) self._threads[:] = [th for th in self._threads if th.is_alive()] if not self._threads: break _poll *= 2 if _poll_timeout <= _poll < 2 * _poll_timeout: if in_exception: # We are already processing an exception: no reason # to trigger another, nor to deadlock for an extended time break logger.warning( "Significant delay observed waiting to join reader " "threads, possible output stream deadlock" ) elif _poll >= _poll_timeout_deadlock: raise RuntimeError( "TeeStream: deadlock observed joining reader threads" ) for h in list(self._handles): h.finalize(self.ostreams) self._threads.clear() self._handles.clear() self._active_handles.clear() self._stdout = None self._stderr = None def __enter__(self): return self def __exit__(self, et, ev, tb): self.close(et is not None) def __del__(self): # Implement __del__ to guarantee that file descriptors are closed # ... but only if we are not called by the GC in the handler thread if threading.current_thread() not in self._threads: self.close() def _start(self, handle): if not _peek_available: th = threading.Thread(target=self._streamReader, args=(handle,)) th.daemon = True th.start() self._threads.append(th) elif not self._threads: th = threading.Thread(target=self._mergedReader) th.daemon = True th.start() self._threads.append(th) else: # The merged reader is already running... nothing additional # needs to be done pass def _streamReader(self, handle): while True: new_data = os.read(handle.read_pipe, io.DEFAULT_BUFFER_SIZE) if not new_data: break handle.decoder_buffer += new_data # At this point, we have new data sitting in the # handle.decoder_buffer handle.decodeIncomingBuffer() # Now, output whatever we have decoded to the output streams handle.writeOutputBuffer(self.ostreams) # # print("STREAM READER: DONE") def _mergedReader(self): noop = [] handles = self._active_handles _poll = _poll_interval _fast_poll_ct = _poll_rampup new_data = '' # something not None while handles: if new_data is None: # For performance reasons, we use very aggressive # polling at the beginning (_poll_interval) and then # ramp up to a much more modest polling interval # (_poll_rampup_limit) as the process runs and the # frequency of new data appearing on the pipe slows if _fast_poll_ct: _fast_poll_ct -= 1 if not _fast_poll_ct: _poll *= 10 if _poll < _poll_rampup_limit: # reset the counter (to potentially increase # the polling interval again) _fast_poll_ct = _poll_rampup else: new_data = None if _mswindows: for handle in list(handles): try: pipe = get_osfhandle(handle.read_pipe) numAvail = PeekNamedPipe(pipe, 0)[1] if numAvail: result, new_data = ReadFile(pipe, numAvail, None) handle.decoder_buffer += new_data break except: handles.remove(handle) new_data = None if new_data is None: # PeekNamedPipe is non-blocking; to avoid swamping # the core, sleep for a "short" amount of time time.sleep(_poll) continue else: # Because we could be *adding* handles to the TeeStream # while the _mergedReader is running, we want to # periodically time out and update the list of handles # that we are waiting for. It is also critical that we # send select() a *copy* of the handles list, as we see # deadlocks when handles are added while select() is # waiting ready_handles = select(list(handles), noop, noop, _poll)[0] if not ready_handles: new_data = None continue handle = ready_handles[0] new_data = os.read(handle.read_pipe, io.DEFAULT_BUFFER_SIZE) if not new_data: handles.remove(handle) continue handle.decoder_buffer += new_data # At this point, we have new data sitting in the # handle.decoder_buffer handle.decodeIncomingBuffer() # Now, output whatever we have decoded to the output streams handle.writeOutputBuffer(self.ostreams)
# # print("MERGED READER: DONE")