Skip to main content
 首页 » 编程设计

python之在 pygtk 应用程序上使用多线程以避免 GUI 卡住

2024年11月01日3haluo1

目前正在使用 Glade for GTK+ 3 开发一个相对复杂的 GUI,并且在理解连接按钮和信号的“操作方法”方面没有问题,而且我对线程有很好的了解。

这是我的应用程序的简化版本的代码,其中线程正在工作:

#!/usr/bin/env python 
import threading,logging 
from gi.repository import Gtk,GObject,Gdk  
import os 
import time 
logging.basicConfig(level=logging.DEBUG, 
                format='[%(levelname)s] (%(threadName)-10s) %(message)s', 
                )  
Path=os.path.dirname(os.path.realpath(__file__)) 
GObject.threads_init() 
 
class MyThread(threading.Thread,GObject.GObject): 
__gsignals__ = { 
   "completed": ( 
            GObject.SIGNAL_RUN_LAST, GObject.TYPE_NONE, []) 
} 
 
def __init__(self,*args): 
    threading.Thread.__init__(self) 
    GObject.GObject.__init__(self) 
    self.cancelled = False 
    self.name = args[0] 
    self.setName("%s" % self.name) 
def run(self): 
    print "Running %s" % str(self) 
    #actual work thread is doing---------- 
    time.sleep(3) 
    #-------------------------------------- 
    logging.debug("Emiting signal...") 
    GObject.idle_add(self.emit,"completed") 
    logging.debug("Thread ending...") 
    logging.debug("Done.") 
 
class GUI(object): 
def __init__(self): 
    #build the GUI 
    window = Gtk.Window() 
    window.set_default_size(200, 200) 
    vbox = Gtk.VBox(False, 5) 
    hbox = Gtk.HBox(True, 5) 
    self.spinner = Gtk.Spinner() 
    self.button = Gtk.Button('Start') 
    window.connect('destroy', Gtk.main_quit) 
    self.button.connect('clicked', self.startAnimation) 
    window.add(vbox) 
    vbox.pack_start(Gtk.Label("Something"), False, False, 0) 
    vbox.pack_start(self.spinner, True, True, 0) 
    vbox.pack_end(hbox, False, False, 0) 
    hbox.pack_start(self.button,True,True,0) 
    window.show_all() 
    Gtk.main() 
 
 
def startAnimation(self,*args): 
    self.button.set_sensitive(False) 
    self.spinner.start() 
    thread=MyThread("Tsat_thread") 
    thread.connect("completed",self.completed_thread) 
    thread.start() 
 
 
def completed_thread(self,*args): 
    #the subprocess call ended successfully so we can continue without problems 
    #updating the result of the capture 
    logging.debug("Function called at ending thread...") 
    print "COMPLETED signal catched" 
    self.spinner.stop() 
    self.spinner.props.visible=False 
    logging.debug("Done.") 
 
print "Start of main GUI" 
gui = GUI() 
#print "mostro la finestra"   

创建一个线程来管理外部进程,否则会卡住界面。当线程完成时,从 MainThread 发出并捕获“完成”信号(据我所知,这是唯一可以访问窗口并应用更改的信号) 这是程序的输出:

Start of main GUI  
Running MyThread(Tsat_thread, started 139636004558592)    
[DEBUG] (Tsat_thread) Emiting signal...    
[DEBUG] (Tsat_thread) Thread ending...    
[DEBUG] (Tsat_thread) Done.    
[DEBUG] (MainThread) Function called at ending thread...    
COMPLETED signal catched   
[DEBUG] (MainThread) Done. 

当我在我的应用程序上尝试相同的方法时,“完成”信号没有从 MainThread 捕获。我想了解问题出在哪里。代码的相关部分不起作用:

import subprocess,threading,logging 
from gi.repository import Gtk,GObject,Gdk  
import os,datetime 
import timeit 
logging.basicConfig(level=logging.DEBUG, 
                format='[%(levelname)s] (%(threadName)-10s) %(message)s', 
                )  
 
Path=os.path.dirname(os.path.realpath(__file__)) 
GObject.threads_init() 
... 
class MyThread(threading.Thread,GObject.GObject): 
__gsignals__ = { 
   "completed": ( 
            GObject.SIGNAL_RUN_LAST, GObject.TYPE_NONE, []) 
} 
 
def __init__(self,*args): 
    threading.Thread.__init__(self) 
    GObject.GObject.__init__(self) 
    self.cancelled = False 
    self.name = args[0] 
    self.interface=args[1] 
    self.duration=args[2] 
    self.conf_tstat=args[3] 
    self.setName("%s" % self.name) 
def run(self): 
    print "Running %s" % str(self) 
    #------------------------------------------------------------ 
    local_conf_file=self.conf_tstat.replace(Path,".") 
    cmd = "dumpcap -a duration:"+self.duration+" -i "+self.interface+" -P -w - |tstat s capture -N "+local_conf_file+" stdin" 
    time=datetime.datetime.now().strftime('%H_%M_%d_%b_%Y') 
    with open("temp_log","wb") as logfile: 
        logfile.write(time+".out\n") 
        process = subprocess.Popen(cmd, shell=True, universal_newlines=True,  stdout=logfile) 
        process.wait() 
    #------------------------------------------------------------------ 
    logging.debug("Emiting signal...") 
    GObject.idle_add(self.emit,"completed") 
    logging.debug("Thread ending...") 
    logging.debug("Done.") 
... 
# On button execute clicking  
def on_button_execute_clicked(self, *args): 
 
    if (self.builder.get_object("radio_online").get_active()): 
        print "#Online" 
        self.builder.get_object("spinner1").start() 
        self.builder.get_object("spinner1").props.visible=True 
        entries=self.get_entries() 
 
        thread=MyThread("Tsat_thread",entries[0],entries[1],entries[2]) 
        thread.connect("completed",self.completed_tstat) 
        thread.start() 
 
    else: 
        print "#Offline" 
        ... 
... 
#function called after Tstat_thread finishes 
def completed_tstat(self,*args): 
    #the subprocess call ended successfully so we can continue without problems 
    #updating the result of the capture 
    logging.debug("Function called at ending thread...") 
    print "COMPLETED signal catched" 
    with open("temp_log","rb") as logfile: 
        capture_no=logfile.readline() 
    capture_no=capture_no.strip('\n')    
    self.builder.get_object("capture_entry").set_text(Path+"/capture/"+capture_no) 
    self.list_of_entries = ["capture_entry","output_entry"] 
    entries = self.get_entries() 
    if entries[1]=='': 
        #default output folder 
        self.builder.get_object("output_entry").set_text(Path+"/output") 
        entries= self.get_entries() 
 
    #deleting possible results from a previous capture 
    rm="rm -f "+entries[1]+"/log_* "+entries[1]+"/outLog.*" 
    subprocess.call(rm.split(),shell=False) 
 
    #triminng the info from the tstat log files 
    command="python ./python/trimInfo.py -t "+entries[0]+ "/log_tcp_complete, "+entries[0]+ "/log_tcp_nocomplete -u "+entries[0]+"/log_udp_complete -o "+entries[1] 
    process=subprocess.Popen(command.split(),shell=False,stdout=subprocess.PIPE) 
    process.wait() 
    returncode=process.poll() 
    if(returncode==0): 
        self.builder.get_object("entry_output_folder").set_text(entries[1]) 
        self.builder.get_object("button_show_out_folder").props.sensitive=True 
        self.show_info("Capture+Transformation of traffic data ended successfully!") 
        self.set_page_complete() 
    else: 
        self.my_log+=returncode 
        self.handle_program_error(my_log) 
 
    self.builder.get_object("spinner1").stop() 
    self.builder.get_object("spinner1").props.visible=False 
    logging.debug("Done.") 

completed_tstat 是回调,但从未在我的应用程序上调用。当我尝试多线程时,我的应用程序的输出如下:`

Start of main GUI 
#Online 
device_entry= 'wlan0' 
time_spinbutton= '10' 
tstat_conf_entry= './codicePippo/tstat-2.3/tstat-conf/net.private' 
capture_entry= './capture/' 
output_entry= './output' 
Running <MyThread(Tsat_thread, started 140689548355328)> 
Capturing on wlan0 
File: - 
 
Packets: 6  
Packets: 7  
Packets: 10  
Packets: 13  
Packets: 14  
Packets: 15  
Packets: 20  
Packets: 23  
Packets: 24  
Packets captured: 24 
Packets received/dropped on interface wlan0: 24/0 (100.0%) 
 
WARN: This udp flow is neither incoming nor outgoing: src - 192.168.1.27; dst - 192.168.1.255! 
[DEBUG] (Tsat_thread) Emiting signal... 
[DEBUG] (Tsat_thread) Thread ending... 
[DEBUG] (Tsat_thread) Done. 

请您参考如下方法:

我建议避免使用线程和子进程,我之前发布的这个答案使用了异步调用,它是为 pygobject(自省(introspection))编写的,但它应该很容易移植到 pygtk

button Stop/Cancel progressBar from subprocess PYGTK