#!/usr/bin/python import wx import time from threading import Thread import os, sys, re, subprocess, shutil from wx.lib.pubsub import setuparg1 from wx.lib.pubsub import pub as Publisher #Start a thread that analyzes audio files. class AnalysisThread(Thread): def __init__(self,args): Thread.__init__(self) self.file = args[0] self.index = args[1] self.setDaemon(True) self.start() def run(self): proc = subprocess.Popen(['ffmpeg', '-nostats', '-i', self.file, '-filter_complex', 'ebur128=peak=true+sample', '-f', 'null', '-'], bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) flag = 0 summary = "" while proc.poll() is None: line = proc.stdout.readline() if line: endProcess = re.search(r'Summary', line) if endProcess is not None: flag = 1 if flag: summary += line wx.CallAfter(Publisher.sendMessage, "update", (self.file, summary, self.index)) #Start a thread that adjusts audio files so that they conform to EBU loudness standards. class AdjustThread(Thread): def __init__(self,args): Thread.__init__(self) self.file = args[0] self.index = args[1] self.IL = args[2] self.TP = args[3] self.SP = args[4] self.setDaemon(True) self.start() def run(self): proc = subprocess.Popen(['ffmpeg', '-nostats', '-i', adjusted_file, '-filter_complex', 'ebur128=peak=true+sample', '-f', 'null', '-'], bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) flag = 0 summary = "" while proc.poll() is None: line = proc.stdout.readline() if line: endProcess = re.search(r'Summary', line) if endProcess is not None: flag = 1 if flag: summary += line wx.CallAfter(Publisher.sendMessage, "update", (self.file, summary, self.index)) class MainWindow(wx.Frame): fileList = collections.OrderedDict() def __init__(self, parent, id, title): wx.Frame.__init__(self, parent, id, title, size=(900, 400)) Publisher.subscribe(self.UpdateDisplay, "update") #Add "analyze" and "Adjust" buttons to the main frame. panel = wx.Panel(self, -1) vbox = wx.BoxSizer(wx.VERTICAL) self.ana = wx.Button(panel, -1, 'Analyze', size=(100, -1)) self.adj = wx.Button(panel, -1, 'Adjust', size=(100, -1)) self.Bind(wx.EVT_BUTTON, self.OnAnalyze, id=self.ana.GetId()) self.Bind(wx.EVT_BUTTON, self.OnAdjust, id=self.adj.GetId()) vbox.Add(self.ana, 0, wx.ALL, 10) vbox.Add(self.adj, 0, wx.ALL, 10) vbox.Add(self.list, 1, wx.EXPAND | wx.TOP, 3) vbox.Add((-1, 10)) panel.SetSizer(hbox) self.Centre() self.Show(True) #This function gets called when "Analyze" is pressed. def OnAnalyze(self, event): for (file,index) in toAnalyze: #Add a progess bar item = self.list.GetItem(index,2) gauge = item.GetWindow() gauge.Pulse() #Launch the analysis thread AnalysisThread(args=(file,index,)) #This function gets called when "Adjust" is pressed. def OnAdjust(self, event): for (file,index) in toAdjust: gauge = wx.Gauge(self.list,-1,range=50,size=(width,15),style=wx.GA_HORIZONTAL | wx.GA_SMOOTH) gauge.Pulse() #shouldn't start this right away... item.SetWindow(gauge, wx.ALIGN_CENTRE) self.list.SetItem(item) #Launch the adjust thread AdjustThread(args=(file,index,intloud,truepeak,samplepeak)) #This function is invoked by the Publisher. def UpdateDisplay(self, msg): t = msg.data file = t[0] summary = t[1] i = t[2] self.ProcessSummary(file, summary, i) item = self.list.GetItem(i,2) gauge = item.GetWindow() gauge.SetValue(50) self.fileList[file][1] = True #Display information from the threads in the main frame. def ProcessSummary(self, file, summary, i): loudnessRange = re.search(r'LRA:\s(.+?) LU', summary) if loudnessRange is not None: LRA = loudnessRange.group(1) else: LRA = "n/a" self.list.SetStringItem(i,7,LRA) self.fileList[file][6] = LRA intloud = re.search(r'I:\s(.+?) LUFS', summary) if intloud is not None: IL = intloud.group(1) else: IL = "n/a" self.list.SetStringItem(i,4,IL) self.fileList[file][3] = IL truePeak = re.search(r'True peak:\s+Peak:\s(.+?) dBFS', summary) if truePeak is not None: TP = truePeak.group(1) else: TP = "n/a" self.list.SetStringItem(i,5,TP) self.fileList[file][4] = TP samplePeak = re.search(r'Sample peak:\s+Peak:\s(.+?) dBFS', summary) if samplePeak is not None: SP = samplePeak.group(1) else: SP = "n/a" self.list.SetStringItem(i,6,SP) self.fileList[file][5] = SP app = wx.App() MainWindow(None, -1, 'Leveler') app.MainLoop()