Streaming solver output via WebSocket near-real-time

55 views
Skip to first unread message

JUAN CAMILO CAMARGO BERRUECO

unread,
Jan 24, 2024, 11:03:07 PMJan 24
to Pyomo Forum

I'm creating an optimization model with pyomo and trying to expose it via FastAPI (plain FastAPI or celery). In this process I'm trying to capture the solver output to stream it near-real-time to user to inspect it. To stream this data, I'm trying to use WebSockets

I've been using capture_output() but this doesn't seems to work.
Any hints to recommend or approach about how to handle this?

If I run uvicorn main:app --reload and open http://localhost:8000/ and push send button model start to solve and  I'm able to stream the solver log into the template, but this only happen when the solving process had finish. For a much-larger model that takes long to solve, I want to watch the logs as near-real-time as possible.

Any hint on this or any other approach?

An example for this app can be as follows:
Structure

|--main.py
|--model.py
|--template.py


Model.py
import pyomo.environ as pyo

model = pyo.ConcreteModel()

model.t = pyo.RangeSet(10000)
model.x = pyo.Var(model.t, domain=pyo.NonNegativeReals)

def constraint(model):
return sum(model.x[t] for t in model.t) >= 10


model.c1 = pyo.Constraint(rule=constraint)

model.obj = pyo.Objective(
expr=sum(model.x[t] for t in model.t),
sense=pyo.minimize
)


Main.py

import logging

from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
from pyomo.common.tee import capture_output
from pyomo.environ import SolverFactory

from model import instance
from template import template

logger = logging.getLogger(mod_name="simple-example-logger")

app = FastAPI()

@app.get("/")
async def get():
return HTMLResponse(template)


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# --- Solve Model ---
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Optimizing...")
solver = SolverFactory("cbc")
with capture_output() as LOGER:
solver_logger = logging.StreamHandler(LOGER)
logger.addHandler(solver_logger)
solver.solve(instance, tee=True)
text = solver_logger.stream.getvalue()
await websocket.send_text(text)


template.py
This is only an .py file with an string with html data to stream WS

template = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""








"La información aquí contenida es para uso exclusivo de la persona o entidad de destino. Está estrictamente prohibida su utilización, copia, descarga, distribución, modificación y/o reproducción total o parcial, sin el permiso expreso de Universidad de Antioquia, pues su contenido puede ser de carácter confidencial y/o contener material privilegiado. Si usted recibió esta información por error, por favor contacte en forma inmediata a quien la envió y borre este material de su computador. Universidad de Antioquia no es responsable por la información contenida en esta comunicación, el directo responsable es quien la firma o el autor de la misma."

UdeA

JUAN CAMILO CAMARGO BERRUECO

unread,
Feb 3, 2024, 5:26:41 PMFeb 3
to Pyomo Forum

I finally could handle this. I'm not that sure that this is the best approach, but this works

Thanks to @Chris for the comment to This post.

I also used This answer to achieve this streaming

I decide to "stream" a "batch" of logs to avoid overhead since some solver (e.g., scip) generate a lot of verbose while optimizing, Then I stream a batch every 5 seconds.

I generate a class to stream replacing the sys.stdout. The method _write_by_batches generates a new event loop to allow asynchronously streaming throuth WebSocket generating a new event loop each time that log batch is streamed. Since I want to avoid overhead of streaming each time log

class MyStdOut(TextIOBase):
def __init__(self, ws: WebSocket, orig_stdout=None,):
self._ws = ws
self.orig_stdout = orig_stdout
self._last_update = datetime.now()
self._batch = ""
def write(self, s):
# new_string = str(s).replace("\n", "/n")
self._stream_solver_log(
current_time=datetime.now(),
msg=s
)
if self.orig_stdout:
self.orig_stdout.write(s)

def _write_by_batches(self, s):
loop = asyncio.new_event_loop()
loop.run_until_complete(
self._ws.send_text(s)
)

def _stream_solver_log(self, current_time: datetime, msg: str):
if (current_time - self._last_update).total_seconds() >= 5:
self._write_by_batches(self._batch)
self._batch = ""
self._last_update = current_time
self._batch = f"{self._batch}\n{msg}"

Then in the websocket endpoint, I get the running loop in order to run the solver.solve(instance, tee=True) using loop.run_in_executor.

The whole main.py script is as follows:

main.py


from io import TextIOBase
import asyncio
from datetime import datetime
import sys

from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
from pyomo.environ import SolverFactory

from model import instance
from template import template


class MyStdOut(TextIOBase):
def __init__(self, ws: WebSocket, orig_stdout=None,):
self._ws = ws
self.orig_stdout = orig_stdout
self._last_update = datetime.now()
self._batch = ""
def write(self, s):
# new_string = str(s).replace("\n", "/n")
self._stream_solver_log(
current_time=datetime.now(),
msg=s
)
if self.orig_stdout:
self.orig_stdout.write(s)

def _write_by_batches(self, s):
loop = asyncio.new_event_loop()
loop.run_until_complete(
self._ws.send_text(s)
)

def _stream_solver_log(self, current_time: datetime, msg: str):
if (current_time - self._last_update).total_seconds() >= 5:
self._write_by_batches(self._batch)
self._batch = ""
self._last_update = current_time
self._batch = f"{self._batch}\n{msg}"
def solve_model(solver, model, options):
return solver.solve(model, **options)


app = FastAPI()

@app.get("/")
async def get():
return HTMLResponse(template)

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# --- Solve Model ---
solver = SolverFactory("scip")
sys.stdout = MyStdOut(ws=websocket, orig_stdout=sys.stdout)
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Optimizing...")
loop = asyncio.get_running_loop()
await asyncio.gather(*[
loop.run_in_executor(None, solve_model, *[solver, instance, {"tee": True}]),
])
await websocket.send_text(f"{sys.stdout._batch} \n\n\nOptimization finished")
Reply all
Reply to author
Forward
0 new messages