I'm trying to create a pipeline in which an AppSource feeds data into the pipeline in order to encode as H265, package in RTP and then send out over a UDPSink.
However when this code is executed, I get one call for NEED_DATA, then one for ENOUGH_DATA, and then the pipeline seems to hang. Not sure if my issue is in the my pipeline string, or my implementation of the AppSource in question.
For further reference the calls to open() and processFrame() are called external to this class.
Code is below:
package com.javashell.video.egressors;
import java.awt.Dimension;
import java.awt.image.BufferedImage;
import java.awt.image.DataBuffer;
import java.awt.image.DataBufferByte;
import java.io.File;
import java.nio.ByteOrder;
import java.util.HashMap;
import java.util.stream.Stream;
import org.freedesktop.gstreamer.Bin;
import org.freedesktop.gstreamer.Buffer;
import org.freedesktop.gstreamer.Caps;
import org.freedesktop.gstreamer.FlowReturn;
import org.freedesktop.gstreamer.Gst;
import org.freedesktop.gstreamer.Pipeline;
import org.freedesktop.gstreamer.Version;
import org.freedesktop.gstreamer.elements.AppSrc;
import org.opencv.core.Core;
import com.javashell.video.VideoEgress;
import com.sun.jna.Platform;
import com.sun.jna.platform.win32.Kernel32;
public class GStreamerEgressor extends VideoEgress {
private AppSrc component;
private BufferedImage bufFrame;
private Pipeline pipeline;
public GStreamerEgressor(Dimension resolution) {
super(resolution);
System.loadLibrary(Core.NATIVE_LIBRARY_NAME);
configurePaths();
Gst.init(Version.BASELINE, "jVid");
component = new AppSrc("GStreamerEgressIngest");
component.set("emit-signals", true);
component.set("stream-type", AppSrc.StreamType.STREAM);
AppSourceListener listener = new AppSourceListener();
component.connect((AppSrc.NEED_DATA) listener);
component.connect((AppSrc.ENOUGH_DATA) listener);
// JNA creates ByteBuffer using native byte order, set masks according to that.
StringBuilder caps = new StringBuilder("video/x-raw,pixel-aspect-ratio=1/1,");
caps.append(ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN ? "format=BGRx" : "format=xRGB");
component.setCaps(new Caps(caps.toString()));
Bin bin = Gst.parseBinFromDescription(
"x265enc tune=zerolatency bitrate=1000 ! h265parse ! rtph265pay config-interval=1 ! udpsink host=238.0.0.240 port=8989",
true);
pipeline = new Pipeline();
pipeline.addMany(component, bin);
Pipeline.linkMany(component, bin);
}
@Override
public BufferedImage processFrame(BufferedImage frame) {
this.bufFrame = frame;
return frame;
}
@Override
public boolean open() {
var ret =
pipeline.play();
System.out.println(
ret.name());
return true;
}
@Override
public boolean close() {
pipeline.stop();
return true;
}
private class AppSourceListener implements AppSrc.NEED_DATA, AppSrc.ENOUGH_DATA {
private boolean enough = false;
@Override
public void needData(AppSrc elem, int arg1) {
System.out.println("Request " + arg1);
// Spin until first frame populates
while (bufFrame == null)
try {
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
// Continue sending data until "ENOUGH" signal is received
while (enough == false) {
DataBuffer currentBuf = bufFrame.getRaster().getDataBuffer();
Buffer buffer = new Buffer(currentBuf.getSize());
if (currentBuf instanceof DataBufferByte) {
buffer.map(true).put(((DataBufferByte) currentBuf).getData());
}
System.out.println("Send " + currentBuf.getSize());
FlowReturn ret = elem.pushBuffer(buffer);
System.out.println(
ret.name());
}
enough = false;
}
@Override
public void enoughData(AppSrc arg0) {
System.out.println("ENOUGH");
enough = true;
}
}
static void configurePaths() {
if (Platform.isWindows()) {
String gstPath = System.getProperty("gstreamer.path", findWindowsLocation());
if (!gstPath.isEmpty()) {
String systemPath = System.getenv("PATH");
if (systemPath == null || systemPath.trim().isEmpty()) {
Kernel32.INSTANCE.SetEnvironmentVariable("PATH", gstPath);
} else {
Kernel32.INSTANCE.SetEnvironmentVariable("PATH", gstPath + File.pathSeparator + systemPath);
}
}
} else if (Platform.isMac()) {
String gstPath = System.getProperty("gstreamer.path", "/Library/Frameworks/GStreamer.framework/Libraries/");
if (!gstPath.isEmpty()) {
String jnaPath = System.getProperty("jna.library.path", "").trim();
if (jnaPath.isEmpty()) {
System.setProperty("jna.library.path", gstPath);
} else {
System.setProperty("jna.library.path", jnaPath + File.pathSeparator + gstPath);
}
}
}
}
static String findWindowsLocation() {
if (Platform.is64Bit()) {
return Stream
.of("GSTREAMER_1_0_ROOT_MSVC_X86_64", "GSTREAMER_1_0_ROOT_MINGW_X86_64",
"GSTREAMER_1_0_ROOT_X86_64")
.map(System::getenv).filter(p -> p != null).map(p -> p.endsWith("\\") ? p + "bin\\" : p + "\\bin\\")
.findFirst().orElse("");
} else {
return "";
}
}
}