Communication between OpenCV and VPython is failing

10 views
Skip to first unread message

Mrt 122

unread,
May 24, 2024, 5:39:36 AMMay 24
to VPython-users
from multiprocessing import Pipe, Process

import cv2
import mediapipe as mp
from vpython import *


# Hand tracking function
def hand_tracking(child_conn):
    mp_drawing = mp.solutions.drawing_utils
    mp_hands = mp.solutions.hands

    cap = cv2.VideoCapture(0)

    with mp_hands.Hands(
        min_detection_confidence=0.8,
        min_tracking_confidence=0.5,
        model_complexity=0,
    ) as hands:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            image = cv2.flip(image, 1)
            image.flags.writeable = False
            results = hands.process(image)
            image.flags.writeable = True
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

            if results.multi_hand_landmarks:
                for hand in results.multi_hand_landmarks:
                    mp_drawing.draw_landmarks(
                        image,
                        hand,
                        mp_hands.HAND_CONNECTIONS,
                        mp_drawing.DrawingSpec(
                            color=(121, 22, 76), thickness=2, circle_radius=4
                        ),
                        mp_drawing.DrawingSpec(
                            color=(250, 44, 250), thickness=2, circle_radius=2
                        ),
                    )

                    thumb_tip = hand.landmark[mp_hands.HandLandmark.THUMB_TIP]
                    thumb_ip = hand.landmark[mp_hands.HandLandmark.THUMB_IP]
                    if thumb_ip.y < thumb_tip.y:
                        print("Thumbs up detected")
                        child_conn.send("thumbs_up")

            cv2.imshow("Hand Tracking", image)
            if cv2.waitKey(10) & 0xFF == ord("q"):
                break

    cap.release()
    cv2.destroyAllWindows()
    child_conn.close()


# VPython visualization function
def vpython_visualization(parent_conn):
    vp_canvas = canvas(title="3D Viewer", width=1080, height=720)
    box_object = box(size=vector(2, 2, 2), color=color.blue)  # Create a box

    if parent_conn.poll():
        gesture = parent_conn.recv()
        if gesture == "thumbs_up":
            box_object.color = color.green

    while True:
        pass
    parent_conn.close()


if __name__ == "__main__":
    parent_conn, child_conn = Pipe()

    # Start the hand tracking process
    hand_tracking_process = Process(target=hand_tracking, args=(child_conn,))
    hand_tracking_process.start()

    # Start the VPython visualization process
    vpython_visualization_process = Process(
        target=vpython_visualization, args=(parent_conn,)
    )
    vpython_visualization_process.start()

    # Wait for both processes to complete
    hand_tracking_process.join()
    vpython_visualization_process.join()

I am trying to make it so when I do "thumbs up" the color of the cube changes but nothing happens, why ? 

John

unread,
May 24, 2024, 11:08:09 AMMay 24
to VPython-users
I modified some multiprocessing code from this article and got this code to work. 


You can try running the attached file  multiprocess_vpy2.py from the command line.

   python  multiprocess_vpy2.py

It will turn the box green when it receives a "thumbs_up" message.


import multiprocessing
from vpython import *

def sender(conn, msgs):
    """
    function to send messages to other end of pipe
    """
    for msg in msgs:
        conn.send(msg)
        print("Sent the message: {}".format(msg))
    conn.close()

def receiver(conn):
    """
    function to print the messages received from other
    end of pipe
    """
    scene = canvas()
    b = box()

    while 1:
        msg = conn.recv()
        if msg == "thumbs_up":        
            b.color = color.green
        if msg == "END":
            pass
            #break
        print("Received the message: {}".format(msg))
       
if __name__ == "__main__":

    #scene = canvas()
    #b = box()

    # messages to be sent
    msgs = ["hello", "hey", "hru?", "thumbs_up", "END"]

    # creating a pipe
    parent_conn, child_conn = multiprocessing.Pipe()

    # creating new processes
    p1 = multiprocessing.Process(target=sender, args=(parent_conn,msgs))
    p2 = multiprocessing.Process(target=receiver, args=(child_conn,))

    # running processes
    p1.start()
    p2.start()

    # wait until processes finish
    p1.join()
    p2.join()

    while True:
        pass

multiprocess_vpy2.py
Reply all
Reply to author
Forward
0 new messages