Asked  6 Months ago    Answers:  5   Viewed   86 times

Ok so I'm trying to run a C program from a python script. Currently I'm using a test C program:

#include <stdio.h>

int main() {
    while (1) {
        printf("2000n");
        sleep(1);
    }
    return 0;
}

To simulate the program that I will be using, which takes readings from a sensor constantly. Then I'm trying to read the output (in this case "2000") from the C program with subprocess in python:

#!usr/bin/python
import subprocess

process = subprocess.Popen("./main", stdout=subprocess.PIPE)
while True:
    for line in iter(process.stdout.readline, ''):
            print line,

but this is not working. From using print statements, it runs the .Popen line then waits at for line in iter(process.stdout.readline, ''):, until I press Ctrl-C.

Why is this? This is exactly what most examples that I've seen have as their code, and yet it does not read the file.

Is there a way of making it run only when there is something to be read?

 Answers

84

It is a block buffering issue.

What follows is an extended for your case version of my answer to Python: read streaming input from subprocess.communicate() question.

Fix stdout buffer in C program directly

stdio-based programs as a rule are line buffered if they are running interactively in a terminal and block buffered when their stdout is redirected to a pipe. In the latter case, you won't see new lines until the buffer overflows or flushed.

To avoid calling fflush() after each printf() call, you could force line buffered output by calling in a C program at the very beginning:

setvbuf(stdout, (char *) NULL, _IOLBF, 0); /* make line buffered stdout */

As soon as a newline is printed the buffer is flushed in this case.

Or fix it without modifying the source of C program

There is stdbuf utility that allows you to change buffering type without modifying the source code e.g.:

from subprocess import Popen, PIPE

process = Popen(["stdbuf", "-oL", "./main"], stdout=PIPE, bufsize=1)
for line in iter(process.stdout.readline, b''):
    print line,
process.communicate() # close process' stream, wait for it to exit

There are also other utilities available, see Turn off buffering in pipe.

Or use pseudo-TTY

To trick the subprocess into thinking that it is running interactively, you could use pexpect module or its analogs, for code examples that use pexpect and pty modules, see Python subprocess readlines() hangs. Here's a variation on the pty example provided there (it should work on Linux):

#!/usr/bin/env python
import os
import pty
import sys
from select import select
from subprocess import Popen, STDOUT

master_fd, slave_fd = pty.openpty()  # provide tty to enable line buffering
process = Popen("./main", stdin=slave_fd, stdout=slave_fd, stderr=STDOUT,
                bufsize=0, close_fds=True)
timeout = .1 # ugly but otherwise `select` blocks on process' exit
# code is similar to _copy() from pty.py
with os.fdopen(master_fd, 'r+b', 0) as master:
    input_fds = [master, sys.stdin]
    while True:
        fds = select(input_fds, [], [], timeout)[0]
        if master in fds: # subprocess' output is ready
            data = os.read(master_fd, 512) # <-- doesn't block, may return less
            if not data: # EOF
                input_fds.remove(master)
            else:
                os.write(sys.stdout.fileno(), data) # copy to our stdout
        if sys.stdin in fds: # got user input
            data = os.read(sys.stdin.fileno(), 512)
            if not data:
                input_fds.remove(sys.stdin)
            else:
                master.write(data) # copy it to subprocess' stdin
        if not fds: # timeout in select()
            if process.poll() is not None: # subprocess ended
                # and no output is buffered <-- timeout + dead subprocess
                assert not select([master], [], [], 0)[0] # race is possible
                os.close(slave_fd) # subproces don't need it anymore
                break
rc = process.wait()
print("subprocess exited with status %d" % rc)

Or use pty via pexpect

pexpect wraps pty handling into higher level interface:

#!/usr/bin/env python
import pexpect

child = pexpect.spawn("/.main")
for line in child:
    print line,
child.close()

Q: Why not just use a pipe (popen())? explains why pseudo-TTY is useful.

Tuesday, June 1, 2021
 
Manmay
answered 6 Months ago
36

As suggested by Mark Ransom, I found the right encoding for that problem. The encoding was "ISO-8859-1", so replacing open("u.item", encoding="utf-8") with open('u.item', encoding = "ISO-8859-1") will solve the problem.

Tuesday, June 1, 2021
 
Joegramming
answered 6 Months ago
85

You could always subclass list and add the "frozen" flag which would block __setitem__ doing anything:

class freezablelist(list):
    def __init__(self,*args,**kwargs):
        list.__init__(self, *args)
        self.frozen = kwargs.get('frozen', False)

    def __setitem__(self, i, y):
        if self.frozen:
            raise TypeError("can't modify frozen list")
        return list.__setitem__(self, i, y)

    def __setslice__(self, i, j, y):
        if self.frozen:
            raise TypeError("can't modify frozen list")
        return list.__setslice__(self, i, j, y)

    def freeze(self):
        self.frozen = True

    def thaw(self):
        self.frozen = False

Then playing with it:

>>> from freeze import freezablelist as fl
>>> a = fl([1,2,3])
>>> a[1] = 'chicken'
>>> a.freeze()
>>> a[1] = 'tuna'
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "freeze.py", line 10, in __setitem__
    raise TypeError("can't modify frozen list")
TypeError: can't modify frozen list
>>> a[1:1] = 'tuna'
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "freeze.py", line 16, in __setslice__
    raise TypeError("can't modify frozen list")
TypeError: can't modify frozen list
>>>
Friday, August 13, 2021
 
Student
answered 4 Months ago
90

Problem is that bash doesn't answer to CTRL-C when not connected with a terminal. Switching to SIGHUP or SIGTERM seems to do the trick:

cmd = ["bash", 'childProc.sh']
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, 
                          stderr=subprocess.STDOUT, 
                          close_fds=True)
time.sleep(3)
print 'killing pid', p.pid
os.kill(p.pid, signal.SIGTERM)
print "timed out and killed child, collecting what output exists so far"
out  = p.communicate()[0]
print "got it", out

Outputs:

killing pid 5844
timed out and killed child, collecting what output exists so far
got it output line 0
output line 1
output line 2
Sunday, August 15, 2021
 
user435216
answered 4 Months ago
57

There are several concerns here:

  • You are creating a new event loop on import, once, but close the event loop in your view. There is no need to close the loop, at all, because a second request will now fail because the loop is closed.

  • The asyncio event loop is not thread safe, and should not be shared between threads. The vast majority of Flask deployments will use threads to handle incoming requests. Your code carries echoes of how this should be handled instead but unfortunately it is not the correct approach. E.g. asyncio.get_child_watcher().attach_loop(eventLoop) is mostly redundant because eventLoop = asyncio.new_event_loop(), if run on the main thread, already does exactly that.

    This is the main candidate for the issues you are seeing.

  • Your code assumes that the executable is in fact present and executable. You should be handling OSError exceptions (and subclasses), because an unqualified s.py would only work if it is made executable, starts with a #! shebang line and is found on the PATH. It won't work just because it is in the same directory, nor would you want to rely on the current working directory anyway.

  • Your code assumes that the process closes stdout at some point. If the subprocess never closes stdout (something that happens automatically when the process exits) then your async for line in process.stdout: loop will wait forever too. Consider adding timeouts to the code to avoid getting blocked on a faulty subprocess.

There are two sections in the Python asyncio documentation that you really would want to read when using asyncio subprocesses in a multi-threaded application:

  • The Concurrency and Multithreading section, explaining that Almost all asyncio objects are not thread safe. You don't want to add tasks to the loop from other threads directly; you want to either use an event loop per thread, or use the asyncio.run_coroutine_threadsafe() function to run a coroutine on a loop in a specific thread.

  • For Python versions up to 3.7, you also need to read the Subprocess and Threads section, because up until that version asyncio uses a non-blocking os.waitpid(-1, os.WNOHANG) call to track child state and relies on using signal handling (which can only be done on the main thread). Python 3.8 removed this restriction (by adding a new child watcher implementation that uses a blocking per-process os.waitpid() call in a separate thread, at the expense of extra memory.

    You don't have to stick to the default child watcher strategy, however. You can use EventLoopPolicy.set_child_watcher() and passing in a different process watcher instance. In practice that means backporting the 3.8 ThreadedChildWatcher implementation.

For your use case, there really no need to need to run a new event loop per thread. Run a single loop, in a separate thread as needed. If you use a loop in a separate thread, depending on your Python version, you may need to have a running loop on the main thread as well or use a different process watcher. Generally speaking, running an asyncio loop on the main thread in a WSGI server is not going to be easy or even possible.

So you need to run a loop, permanently, in a separate thread, and you need to use a child process watcher that works without a main thread loop. Here is an implementation for just that, and this should work for Python versions 3.6 and newer:

import asyncio
import itertools
import logging
import time
import threading

try:
    # Python 3.8 or newer has a suitable process watcher
    asyncio.ThreadedChildWatcher
except AttributeError:
    # backport the Python 3.8 threaded child watcher
    import os
    import warnings

    # Python 3.7 preferred API
    _get_running_loop = getattr(asyncio, "get_running_loop", asyncio.get_event_loop)

    class _Py38ThreadedChildWatcher(asyncio.AbstractChildWatcher):
        def __init__(self):
            self._pid_counter = itertools.count(0)
            self._threads = {}

        def is_active(self):
            return True

        def close(self):
            pass

        def __enter__(self):
            return self

        def __exit__(self, exc_type, exc_val, exc_tb):
            pass

        def __del__(self, _warn=warnings.warn):
            threads = [t for t in list(self._threads.values()) if t.is_alive()]
            if threads:
                _warn(
                    f"{self.__class__} has registered but not finished child processes",
                    ResourceWarning,
                    source=self,
                )

        def add_child_handler(self, pid, callback, *args):
            loop = _get_running_loop()
            thread = threading.Thread(
                target=self._do_waitpid,
                name=f"waitpid-{next(self._pid_counter)}",
                args=(loop, pid, callback, args),
                daemon=True,
            )
            self._threads[pid] = thread
            thread.start()

        def remove_child_handler(self, pid):
            # asyncio never calls remove_child_handler() !!!
            # The method is no-op but is implemented because
            # abstract base class requires it
            return True

        def attach_loop(self, loop):
            pass

        def _do_waitpid(self, loop, expected_pid, callback, args):
            assert expected_pid > 0

            try:
                pid, status = os.waitpid(expected_pid, 0)
            except ChildProcessError:
                # The child process is already reaped
                # (may happen if waitpid() is called elsewhere).
                pid = expected_pid
                returncode = 255
                logger.warning(
                    "Unknown child process pid %d, will report returncode 255", pid
                )
            else:
                if os.WIFSIGNALED(status):
                    returncode = -os.WTERMSIG(status)
                elif os.WIFEXITED(status):
                    returncode = os.WEXITSTATUS(status)
                else:
                    returncode = status

                if loop.get_debug():
                    logger.debug(
                        "process %s exited with returncode %s", expected_pid, returncode
                    )

            if loop.is_closed():
                logger.warning("Loop %r that handles pid %r is closed", loop, pid)
            else:
                loop.call_soon_threadsafe(callback, pid, returncode, *args)

            self._threads.pop(expected_pid)

    # add the watcher to the loop policy
    asyncio.get_event_loop_policy().set_child_watcher(_Py38ThreadedChildWatcher())

__all__ = ["EventLoopThread", "get_event_loop", "stop_event_loop", "run_coroutine"]

logger = logging.getLogger(__name__)

class EventLoopThread(threading.Thread):
    loop = None
    _count = itertools.count(0)

    def __init__(self):
        name = f"{type(self).__name__}-{next(self._count)}"
        super().__init__(name=name, daemon=True)

    def __repr__(self):
        loop, r, c, d = self.loop, False, True, False
        if loop is not None:
            r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
        return (
            f"<{type(self).__name__} {self.name} id={self.ident} "
            f"running={r} closed={c} debug={d}>"
        )

    def run(self):
        self.loop = loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

        try:
            loop.run_forever()
        finally:
            try:
                shutdown_asyncgens = loop.shutdown_asyncgens()
            except AttributeError:
                pass
            else:
                loop.run_until_complete(shutdown_asyncgens)
            loop.close()
            asyncio.set_event_loop(None)

    def stop(self):
        loop, self.loop = self.loop, None
        if loop is None:
            return
        loop.call_soon_threadsafe(loop.stop)
        self.join()

_lock = threading.Lock()
_loop_thread = None

def get_event_loop():
    global _loop_thread
    if _loop_thread is None:
        with _lock:
            if _loop_thread is None:
                _loop_thread = EventLoopThread()
                _loop_thread.start()
                # give the thread up to a second to produce a loop
                deadline = time.time() + 1
                while not _loop_thread.loop and time.time() < deadline:
                    time.sleep(0.001)

    return _loop_thread.loop

def stop_event_loop():
    global _loop_thread
    with _lock:
        if _loop_thread is not None:
            _loop_thread.stop()
            _loop_thread = None

def run_coroutine(coro):
    return asyncio.run_coroutine_threadsafe(coro, get_event_loop())

The above is the same general 'run async with Flask' solution as I posted for Make a Python asyncio call from a Flask route, but with the addition of the ThreadedChildWatcher backport.

You can then use the loop returned from get_event_loop() to run child processes, by calling run_coroutine_threadsafe():

import asyncio
import locale
import logging

logger = logging.getLogger(__name__)


def get_command_output(cmd, timeout=None):
    encoding = locale.getpreferredencoding(False)

    async def run_async():
        try:
            process = await asyncio.create_subprocess_exec(
                cmd, stdout=asyncio.subprocess.PIPE)
        except OSError:
            logging.exception("Process %s could not be started", cmd)
            return
        
        async for line in process.stdout:
            line = line.decode(encoding)
            # TODO: actually do something with the data.
            print(line, flush=True)

        process.kill()
        logging.debug("Process for %s exiting with %i", cmd, process.returncode)

        return await process.wait()

    future = run_coroutine(run_async())
    result = None
    try:
        result = future.result(timeout)
    except asyncio.TimeoutError:
        logger.warn('The child process took too long, cancelling the task...')
        future.cancel()
    except Exception as exc:
        logger.exception(f'The child process raised an exception')
    return result

Note that the above function can take a timeout, in seconds, the maximum amount of time you'll wait for the subprocess to complete.

Friday, August 27, 2021
 
jezrael
answered 3 Months ago
Only authorized users can answer the question. Please sign in first, or register a free account.
Not the answer you're looking for? Browse other questions tagged :
 
Share