import sys, time
import numpy as np
from PyQt5 import QtWidgets
from PyQt5.QtCore import QTimer
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore, QtGui
from pyqtgraph import GraphicsLayoutWidget
from multiprocessing import Process, Pipe, Event
class PyQtGraphTest(GraphicsLayoutWidget):
def __init__(self, pipe_head, pipe_tail):
super().__init__()
self.pipe_head = pipe_head
self.pipe_tail = pipe_tail
# Configure plot area
self.setWindowTitle('Test pyqtgraph paint signals')
self.resize(640, 400)
self.plot = self.addPlot()
self.spectrum = self.plot.plot()
self.plot.enableAutoRange(pg.ViewBox.XYAxes)
# Timer to trigger check for new data and plot updates
self.update_timer = QTimer()
self.update_timer.timeout.connect(self.update_plot)
self.update_timer.start(16)
# Check for new data in pipe and update plot
def update_plot(self):
if self.pipe_tail.poll():
self.spectrum.setData(self.pipe_tail.recv())
def start_receiver(pipe_head, pipe_tail):
app = QtWidgets.QApplication(sys.argv)
window = PyQtGraphTest(pipe_head, pipe_tail)
window.show()
app.exec_()
# Routine to acquire and serve data
# This might be a camera driver, notifying when a new frame is available
def start_sender(pipe_head, pipe_tail, closing_event):
while not closing_event.is_set():
# Try not to block up the pipe...
if not pipe_tail.poll():
n = 1600
data = np.zeros(n)
data += np.cos(np.arange(0, 10*np.pi, 10*np.pi/n) - 9*time.monotonic())
data += np.cos(np.arange(0, 4*np.pi, 4*np.pi/n) + 4*time.monotonic())
pipe_head.send(data)
time.sleep(0.01)
if __name__ == '__main__':
# Event to signal closing of the plot window to the other process
# (Possibly could instead pass data back the other way through the pipe)
closing_event = Event()
# Pipe for data transport
pipe_head, pipe_tail = Pipe()
print("Starting reciever...")
receiver = Process(target=start_receiver, args=(pipe_head, pipe_tail))
receiver.start()
print("Starting sender...")
sender = Process(target=start_sender, args=(pipe_head, pipe_tail, closing_event))
sender.start()
print("Waiting for receiver to exit...")
receiver.join()
print("Waiting for sender to exit...")
closing_event.set()
sender.join()
print("Processes finished.")
sys.exit(0)