Revision: 5277
Updated Code
at February 24, 2008 23:04 by stagger
Updated Code
#!/usr/bin/env python # encoding: utf-8 """ collector.py Created by Daniel Link on 2007-12-05. Copyright (c) 2007 Top Trend Manucfacturing Co., Ltd.. All rights reserved. """ import wx import wx.lib.scrolledpanel as scrolled import wx.lib.newevent from wx.lib.evtmgr import eventManager import sys import os import pymssql import softpro import validators import string import csv import time import thread FB_SO = 1 FB_CU = 2 FB_PR = 4 FB_MO = 8 FB_OD = 16 FB_CD = 32 FB_SM = 64 # This creates a new Event class and a EVT binder function (ThreadDone, EVT_THREAD_DONE) = wx.lib.newevent.NewEvent() class MainApp(wx.App): """ Collects various information from SQL server """ def OnInit(self): """ Create main frame and initialize variables """ self.connected = False # SQL server connection status self.verbose = False self.main_frame = MainFrame(None, -1, "SoftPRO data collector") self.nb = self.main_frame.notebook self.thread_id = 1 self.threads = {} # SQL events: eventManager.Register(self.OnConnect, wx.EVT_BUTTON, self.nb.sql_win.connect_button) eventManager.Register(self.OnConnect, wx.EVT_TEXT_ENTER, self.nb.sql_win.host_edit) eventManager.Register(self.OnConnect, wx.EVT_TEXT_ENTER, self.nb.sql_win.user_edit) eventManager.Register(self.OnConnect, wx.EVT_TEXT_ENTER, self.nb.sql_win.password_edit) eventManager.Register(self.OnConnect, wx.EVT_TEXT_ENTER, self.nb.sql_win.database_edit) # Job events: eventManager.Register(self.OnOPCollect, wx.EVT_BUTTON, self.nb.pages["op_win"].collect_button) #eventManager.Register(self.OnCostCollect, wx.EVT_BUTTON, self.nb.pages["costs_win"].collect_button) # Thread events: eventManager.Register(self.OnClose, wx.EVT_CLOSE, self.main_frame) #eventManager.Register(self.OnGaugeUpdate, EVT_GAUGE_UPDATE, self.main_frame) eventManager.Register(self.OnThreadDone, EVT_THREAD_DONE, self.main_frame) self.main_frame.Show(True) return True def OnConnect(self, e): """ Connect / Disconnect from SQL Server """ nb = self.main_frame.notebook sql = self.main_frame.notebook.sql_win host = sql.host_edit.GetValue() user = sql.user_edit.GetValue() password = sql.password_edit.GetValue() database = sql.database_edit.GetValue() if not self.connected: # connect try: self.sql_connection = pymssql.connect(host=host, user=user, password=password, database=database) except: wx.LogError("Connection failed.nnException: " + str(sys.exc_info()[0])) raise self.connected = True for widget in sql.login_widgets: widget.Disable() wx.LogStatus("Connected to %s as %s" % (host, user)) sql.connect_button.SetLabel("Disconnect") for page in nb.pages.itervalues(): page.Enable() else: # disconnect self.sql_connection.close() self.connected = False for widget in sql.login_widgets: widget.Enable() wx.LogStatus("Disconnected %s from %s" % (user, host)) sql.connect_button.SetLabel("Connect") for page in nb.pages.itervalues(): page.Disable() def OnOPCollect(self, e): """ Collect OP strings which pass through filter and write them to disk """ dlg = wx.FileDialog(self.main_frame, message="Save file as ...", defaultDir=os.getcwd(), defaultFile="", wildcard="CSV Table for Excel Import|*.csv", style=wx.SAVE) if dlg.ShowModal() != wx.ID_OK: return op_filter_box = self.nb.op_filter_box cur = self.sql_connection.cursor() # Disable some widgets self.nb.pages["op_win"].collect_button.Disable() op_filter_box.Disable() wx.LogStatus("Searching for product variants...") # get all sales orders that pass through filter name = "od20sord" t = op_filter_box.Translate(name) query = "SELECT %s.fsono" % t #for field in op_filter_box.GetCheckedFields(): # if (field != t + ".fsono"): # query += ", " + op_filter_box.MakeShort(field) query += " FROM %s %s WHERE %s.fitemno1 LIKE ' 1' " % (name, t, t) if not op_filter_box.HasChecked(): wx.LogWarning("You have chosen not to filter output. The operation will probably take a very long time.") else: query += op_filter_box.GetString() query += " GROUP BY %s.fsono" % t wx.LogVerbose("Executing Query: %s" % query) try: cur.execute(query) except: message = "SQL Query failed. " if wx.Log.GetVerbose(): message += "Check the log for details. Does the query correctly execute when you run it manually with Microsoft SQL Enterprise Manager?" else: message += "Please enable verbose output on the logging page and repeat what you just did." wx.LogError(message + "nnException: " + str(sys.exc_info()[0])) wx.LogStatus("") raise results = cur.fetchall() if not results: wx.LogWarning("Your query was too restrictive. The server did not return any results. Sorry.") return codes = [] for row in results: codes.append(row[0]) wx.LogStatus("Finding OP codes for %d Sales Orders..." % len(codes)) wx.LogVerbose("Get some coffee! Seriously!") # Start thread here assert not self.threads.has_key("op"), "The OP thread seems to be running already." self.threads["op"] = OPCollectThread(self.main_frame, self.thread_id, cur, self.nb.op_gauge, self.nb.op_remain, codes, dlg.GetPath()+".csv") self.threads["op"].Start() button = self.nb.pages["op_win"].collect_button eventManager.DeregisterWindow(button) eventManager.Register(self.OnOPAbort, wx.EVT_BUTTON, button) button.SetLabel("Abort") button.Enable() self.thread_id += 1 def OnCostCollect(self, e): """Gives all the costs which pass through the filter""" pass def OnClose(self, e): """ Stop running threads on window close """ if len(self.threads) > 0: busy = wx.BusyInfo("Shutting down pending processes...") for t in self.threads.itervalues(): t.Stop() running = 1 while running: running = 0 for t in self.threads.itervalues(): running = running + t.IsRunning() time.sleep(0.1) # Pass event to the next handler (and, thus, close the window) e.Skip() def OnThreadDone(self, e): t = e.type assert self.threads.has_key(t), "There is no %s thread!" % t del self.threads[t] button = self.nb.pages[t+"_win"].collect_button eventManager.DeregisterWindow(button) eventManager.Register(self.OnOPCollect, wx.EVT_BUTTON, button) button.SetLabel("Collect") def OnOPAbort(self, e): assert self.threads.has_key("op"), "The process can not be aborted, because it is not running!" self.threads["op"].Stop() wx.LogStatus("OP Collection aborted.") class BasicThread: def __init__(self, parent, id, cur, gauge, remain, filename): """ Initialize Thread parent: where EVT_GAUGE_UPDATE shall be sent cur: pymssql SQL connection cursor gauge: wx.Gauge remain: wx.StaticText for remaining time filename: where to save the results Derived classes _must_ set self.total in their __init__! """ self.parent = parent self.cur = cur self.gauge = gauge self.remain = remain self.filename = filename self.running = False self.keepgoing = False self.progress = 0 def Start(self): self.running = True self.keepgoing = True self.start_time = time.time() wx.LogVerbose("Starting Thread...") thread.start_new_thread(self.Run, ()) def Stop(self): self.keepgoing = False def IsRunning(self): return self.running def Run(self): self.running = False class OPCollectThread(BasicThread): def __init__(self, parent, id, cur, gauge, remain, sorders, filename): """ Initialize Thread sorders: list of sales order codes """ BasicThread.__init__(self, parent, id, cur, gauge, remain, filename) self.sorders = sorders self.total = len(sorders) wx.CallAfter(gauge.SetRange, self.total) self.opdic = {} def Run(self): # collect OP codes for current sales order: while self.keepgoing and (self.progress < self.total): code = self.sorders[self.progress] # code of current sales order sorder = softpro.SalesOrder(self.cur, code) for soitem in sorder.items: bom = soitem.bom if bom: opstring = soitem.get_opstring() qty = int(soitem.bom.quantity) if opstring: if not self.opdic.has_key(opstring): self.opdic[opstring] = qty else: self.opdic[opstring] += qty wx.LogVerbose("%s:%s (%6d) => %s" % (code, soitem.number.lstrip(), soitem.bom.quantity, opstring)) self.progress += 1 # Update Gauge wx.CallAfter(self.gauge.SetValue, self.progress) # Update StaticText with remaining time elapsed = time.time() - self.start_time per_so = elapsed/self.progress remain = per_so*(self.total-self.progress) text = time.strftime("%H:%M:%S", time.gmtime(remain)) wx.CallAfter(self.remain.SetLabel, text) # if all sales orders have been processed, write them to disk: if (self.progress == self.total): wx.LogVerbose("%d product variants could be identified." % len(self.opdic)) writer = csv.writer(file(self.filename, "w"), dialect="excel", delimiter=";") position = 0 wx.LogVerbose("Writing %s rows to " % self.filename) for op, quantity in self.opdic.items(): position += 1 writer.writerow([op, quantity]) if (position % 100 == 0): wx.LogVerbose("%d rows written" % position) wx.CallAfter(wx.LogStatus, "Wrote %d product variants to %s" % (position, self.filename)) # reset Gauge wx.CallAfter(self.gauge.SetValue, 0) # signal thread stop self.running = False evt = ThreadDone(type="op") wx.PostEvent(self.parent, evt) class MainFrame(wx.Frame): def __init__(self,parent,id,title): wx.Frame.__init__(self, parent, wx.ID_ANY, title, size=(800,600), style=wx.DEFAULT_FRAME_STYLE) self.CreateStatusBar() sizer = wx.BoxSizer(wx.VERTICAL) self.notebook = notebook = MainNotebook(self, wx.ID_ANY) sizer.Add(notebook, 1, wx.EXPAND) self.SetSizer(sizer) class MainNotebook(wx.Notebook): def __init__(self, parent, id): """ Add pages and disable all except SQL Connection """ wx.Notebook.__init__(self, parent, id) self.pages = {} # for easy disabling of collector pages self.AddGeneralPage() self.AddOPPage() #self.AddCostsPage() for page in self.pages.itervalues(): page.Disable() def AddGeneralPage(self): """ Add Page for SQL Connection """ row = 1 self.sql_win = win = wx.Panel(self, wx.ID_ANY) sizer = wx.BoxSizer(wx.VERTICAL) win.login_widgets = [] login_box = wx.StaticBox(win, wx.ID_ANY, "SQL Connection") login_box_sizer = wx.StaticBoxSizer(login_box, wx.VERTICAL) login_sizer = wx.FlexGridSizer(cols=2, vgap=10, hgap=10) login_sizer.AddGrowableCol(1) # Host text = wx.StaticText(win, wx.ID_ANY, "Host:") win.login_widgets.append(text) login_sizer.Add(text, 0) sql_host = "server ip" if (sys.platform == "darwin"): # Mac needs port number too sql_host += ":1433" win.host_edit = edit = wx.TextCtrl(win, wx.ID_ANY, sql_host, validator=validators.HostValidator(), style=wx.TE_PROCESS_ENTER) win.login_widgets.append(edit) login_sizer.Add(edit, 1, wx.EXPAND) row += 1 # User text = wx.StaticText(win, wx.ID_ANY, "User:") win.login_widgets.append(text) login_sizer.Add(text, 0) win.user_edit = edit = wx.TextCtrl(win, wx.ID_ANY, "user name", style=wx.TE_PROCESS_ENTER) win.login_widgets.append(edit) login_sizer.Add(edit, 1, wx.EXPAND) row += 1 # Password text = wx.StaticText(win, wx.ID_ANY, "Password:") win.login_widgets.append(text) login_sizer.Add(text, 0) win.password_edit = edit = wx.TextCtrl(win, wx.ID_ANY, style=wx.TE_PASSWORD | wx.TE_PROCESS_ENTER) edit.SetFocus() win.login_widgets.append(edit) login_sizer.Add(edit, 1, wx.EXPAND) row += 1 # Database text = wx.StaticText(win, wx.ID_ANY, "Database:") win.login_widgets.append(text) login_sizer.Add(text, 0) win.database_edit = edit = wx.TextCtrl(win, wx.ID_ANY, "TTM", style=wx.TE_PROCESS_ENTER) win.login_widgets.append(edit) login_sizer.Add(edit, 1, wx.EXPAND) row += 1 # Events eventManager.Register(self.OnSQLChange, wx.EVT_TEXT, win) # Put in supersizer login_box_sizer.Add(login_sizer, 0, wx.EXPAND) login_box_sizer.AddSpacer((1,10)) # Connect Button win.connect_button = button = wx.Button(win, wx.ID_ANY, "Connect") login_box_sizer.Add(button, 0, wx.EXPAND) # Add box sizer to window sizer sizer.Add(login_box_sizer, 0, wx.EXPAND) log_box = wx.StaticBox(win, wx.ID_ANY, "Logging") log_box_sizer = wx.StaticBoxSizer(log_box, wx.VERTICAL) # Toggle Verbose Mode self.check_verbose = check = wx.CheckBox(win, wx.ID_ANY, "Verbose Mode") eventManager.Register(self.OnVerboseCheck, wx.EVT_CHECKBOX, check) log_box_sizer.Add(check, 0) sizer.Add(log_box_sizer, 0, wx.EXPAND) win.SetSizer(sizer) self.AddPage(win, "General") def AddOPPage(self): """ Add Page for OP Sequence """ self.pages["op_win"] = win = scrolled.ScrolledPanel(self, wx.ID_ANY) sizer = wx.BoxSizer(wx.VERTICAL) """ # Output options self.op_group_box = box = GroupBox(win, wx.ID_ANY, "Output Fields") sizer.Add(box.sizer, 0, wx.EXPAND) """ # Filter self.op_filter_box = box = FilterBox(win, wx.ID_ANY, "Filter by", style=FB_SO|FB_OD|FB_CD) sizer.Add(box.sizer, 0, wx.EXPAND) # Gauge subsizer = wx.BoxSizer(wx.HORIZONTAL) self.op_gauge = gauge = wx.Gauge(win, wx.ID_ANY) subsizer.Add(gauge, 1) subsizer.AddSpacer((5,1)) self.op_remain = text = wx.StaticText(win, wx.ID_ANY, "00:00:00") subsizer.Add(text, 0) subsizer.AddSpacer((5,1)) sizer.Add(subsizer, 0, wx.EXPAND) # Collect Button win.collect_button = button = wx.Button(win, wx.ID_ANY, "Collect") sizer.Add(button, 0, wx.EXPAND) """ # Test Button button = wx.Button(win, wx.ID_ANY, "Print SQL Filter and Group string") eventManager.Register(self.OnTestButton, wx.EVT_BUTTON, button) sizer.Add(button, 0, wx.EXPAND) """ win.SetSizer(sizer) win.SetupScrolling() self.AddPage(win, "OP Sequence") def AddCostsPage(self): """ Add Page for Actual Costs """ self.pages["costs_win"] = win = scrolled.ScrolledPanel(self, wx.ID_ANY) sizer = wx.BoxSizer(wx.VERTICAL) # Output options self.costs_group_box = box = GroupBox(win, wx.ID_ANY, "Output Fields") sizer.Add(box.sizer, 0, wx.EXPAND) # Filter self.costs_filter_box = box = FilterBox(win, wx.ID_ANY, "Filter by") sizer.Add(box.sizer, 0, wx.EXPAND) # Gauge self.costs_gauge = gauge = wx.Gauge(win, wx.ID_ANY) sizer.Add(gauge, 0, wx.EXPAND) # Collect Button win.collect_button = button = wx.Button(win, wx.ID_ANY, "Collect") sizer.Add(button, 0, wx.EXPAND) win.SetSizer(sizer) win.SetupScrolling() self.AddPage(win, "Actual Costs") def OnSQLChange(self, event): """ Disable Connect button when required edit field is empty """ if event.GetString() == "": self.connect_button.Disable() else: self.connect_button.Enable() def OnVerboseCheck(self, event): wx.Log.SetVerbose(self.check_verbose.GetValue()) def OnTestButton(self, event): pass class SQLBox(wx.StaticBox): def __init__(self, parent, id, label): wx.StaticBox.__init__(self, parent, id, label) self.row = 0 # "internal name": ("name on server", "output title") self.sql_fields = {"sales order": ("od20sord.fsono", "Sales Order"), "so item": ("od20sord.fitemno1", "Sales Order Item"), "work order": ("pd20woi1.forderno", "Work Order"), "op seq": ("bd01stb1.fopseq", "OP Sequence"), "op code": ("bd01stb1.fopcode", "OP Code"), "comp code": ("bd01stb2.fcompcode", "Component Code"), "comp qty": ("bd01stb2.fqty", "Component Quantity"), "customer": ("empty", "Customer ID"), "product": ("od20sord.fpdcode", "Product Code"), "mould": ("empty", "Mould Code"), "order date": ("od20sord.fsodate", "Order Date"), "commit date": ("od20sord.fddate", "Commit Date"), "order qty": ("od20sord.fqty", "Order Quantity"), "plan qty": ("pd20woi1.fwoqty", "Plan Quantity"), "salesman": ("empty", "Salesman ID")} # building dict like: {"Product Code":"od20sord.fpdcode"} self.titles = {} for field in self.sql_fields.iterkeys(): key = self.sql_fields[field][1] value = field self.titles[key] = value # building dict like: {"od20sord.fpdcode":"product"} self.names = {} for field in self.sql_fields.iterkeys(): key = self.sql_fields[field][0] value = field self.names[key] = value # building table translator dictionary likke: {"od20sord": "t1"} self.translator = {} index = 1 for name in self.names.iterkeys(): if self.translator.has_key(name): continue table = name.rstrip(string.letters).replace(".", "") self.translator[table] = "t%d" % index index += 1 def GetTitles(self): """ Gives a simple list of field descriptions """ field_names = [] for title in self.titles.iterkeys(): field_names.append(title) return field_names def Translate(self, table): """ Give table's short identifier """ assert self.translator.has_key(table), "No dictionary entry for %s!" % table return self.translator[table] def MakeShort(self, field): """ Replaces table in table.field with short identifier """ alphanum = string.letters + string.digits table = field.rstrip(alphanum).replace(".", "") table = self.Translate(table) field = field.lstrip(alphanum) complete = table + field assert complete, "%s does not have the expected format "table.field"" return complete class FilterBox(SQLBox): def __init__(self, parent, id, label, style=FB_SO|FB_CU|FB_PR|FB_MO|FB_OD|FB_CD|FB_SM): SQLBox.__init__(self, parent, id, label) self.ranges = {} # {name+"_min": textctrl, name+"_max": textctrl} self.checks = {} # {name: checkbox} self.selects = {} # {name+"_sel": listbox} self.ids = {} # {button_id: listbox/editctrl} self.id = 100 self.sizer = wx.StaticBoxSizer(self, wx.VERTICAL) self.subsizer = wx.GridBagSizer(vgap=10, hgap=10) self.style = style def AddRangeLine(linetype, name): self.ids[self.id] = {} self.ids[self.id+1] = {} parent = self.GetParent() title = self.sql_fields[name][1] if linetype == "text": self.ids[self.id]["textctrl"] = self.ranges[name+"_min"] = wx.TextCtrl(parent, wx.ID_ANY) self.ids[self.id]["textctrl"] = self.ranges[name+"_max"] = wx.TextCtrl(parent, wx.ID_ANY) elif linetype == "date": self.ids[self.id]["datepickerctrl"] = self.ranges[name+"_min"] = wx.DatePickerCtrl(parent, wx.ID_ANY, style = wx.DP_DEFAULT | wx.DP_SHOWCENTURY) self.ids[self.id+1]["datepickerctrl"] = self.ranges[name+"_max"] = wx.DatePickerCtrl(parent, wx.ID_ANY, style = wx.DP_DEFAULT | wx.DP_SHOWCENTURY) else: raise "Unknown type of range line: " + linetype self.checks[name] = check = wx.CheckBox(parent, wx.ID_ANY, title) self.subsizer.Add(check, (self.row,0)) self.subsizer.Add(wx.StaticText(parent, wx.ID_ANY, "min:"), (self.row,1)) self.subsizer.Add(self.ranges[name+"_min"], (self.row,2)) self.subsizer.Add(wx.StaticText(parent, wx.ID_ANY, "max:"), (self.row,3)) self.subsizer.Add(self.ranges[name+"_max"], (self.row,4)) self.id += 2 self.row += 1 def AddSelectLine(name): self.ids[self.id] = {} parent = self.GetParent() title = "Certain %ss" % self.sql_fields[name][1] self.checks[name+"_sel"] = check = wx.CheckBox(parent, wx.ID_ANY, title) self.subsizer.Add(check, (self.row,0)) self.ids[self.id]["textctrl"] = self.selects[name+"_sel"] = wx.TextCtrl(parent, wx.ID_ANY) self.subsizer.Add(self.selects[name+"_sel"], (self.row,2)) button = wx.Button(parent, self.id, "Add") eventManager.Register(self.OnListAdd, wx.EVT_BUTTON, button) self.subsizer.Add(button, (self.row,3)) self.ids[self.id]["listbox"] = self.selects[name] = wx.ListBox(parent, wx.ID_ANY, style=wx.LB_SINGLE | wx.LB_NEEDED_SB | wx.LB_SORT) self.subsizer.Add(self.selects[name], (self.row,4)) button = wx.Button(parent, self.id, "Remove") eventManager.Register(self.OnListDel, wx.EVT_BUTTON, button) self.subsizer.Add(button, (self.row,5)) self.id += 1 self.row += 1 if style & FB_SO: AddRangeLine("text", "sales order") if style & FB_CU: AddRangeLine("text", "customer") AddSelectLine("customer") if style & FB_PR: AddRangeLine("text", "product") AddSelectLine("product") if style & FB_MO: AddRangeLine("text", "mould") AddSelectLine("mould") if style & FB_OD: AddRangeLine("date", "order date") if style & FB_CD: AddRangeLine("date", "commit date") if style & FB_SM: AddRangeLine("text", "salesman") self.sizer.Add(self.subsizer, 0, wx.EXPAND) def GetString(self): """gives selected filters as SQL conditions""" sql_filter = "" def from_select(name): """Give SQL condition from listbox items""" if not self.checks[name+"_sel"].GetValue(): return "" sel = self.selects[name+"_sel"] count = sel.GetCount() items = [] sql_filter = "" field = self.sql_fields[name][0] field = self.MakeShort(field) for pos in range(count): string = sel.GetString(pos) items.append(string) for item in items: sql_filter += "OR %s LIKE %s " % (field, item) return sql_filter def from_range(fieldtype, name): "Give SQL range condition from TextCtrl (edit) fields" if not self.checks[name].GetValue(): return "" tmp = "" field = self.sql_fields[name][0] field = self.MakeShort(field) min_val = self.ranges[name+"_min"].GetValue() if min_val: if fieldtype == "date": tmp += "AND DATEPART(dd, %s) >= %s " % (field, min_val.GetDay()) tmp += "AND DATEPART(mm, %s) >= %s " % (field, min_val.GetMonth()) tmp += "AND DATEPART(yyyy, %s) >= %s " % (field, min_val.GetYear()) else: tmp += "AND %s >= %s " % (field, min_val) max_val = self.ranges[name+"_max"].GetValue() if max_val: if fieldtype == "date": tmp += "AND DATEPART(dd, %s) <= %s " % (field, max_val.GetDay()) tmp += "AND DATEPART(mm, %s) <= %s " % (field, max_val.GetMonth()) tmp += "AND DATEPART(yyyy, %s) <= %s " % (field, max_val.GetYear()) else: tmp += "AND %s <= %s " % (field, max_val) return tmp if self.style & FB_SO: sql_filter += from_range("text", "sales order") if self.style & FB_CU: sql_filter += from_range("text", "customer") sql_filter += from_select("customer") if self.style & FB_PR: sql_filter += from_range("text", "product") sql_filter += from_select("product") if self.style & FB_MO: sql_filter += from_range("text", "mould") sql_filter += from_select("mould") if self.style & FB_OD: sql_filter += from_range("date", "order date") if self.style & FB_CD: sql_filter += from_range("date", "commit date") if self.style & FB_SM: sql_filter += from_range("text", "salesman") return sql_filter def GetCheckedFields(self): """ Gives a list of all checked SQL field names. """ fields = [] for name, check in self.checks.items(): field = self.sql_fields[name.replace("_sel", "")][0] if check.GetValue() and (field != "empty"): fields.append(field) return fields def OnListAdd(self, e): """Add a value from an edit field to a listbox""" listbox = self.ids[e.GetId()]["listbox"] edit = self.ids[e.GetId()]["textctrl"] value = edit.GetValue() if value: listbox.Append(value) def OnListDel(self, e): """Delete the selected value from a listbox""" listbox = self.ids[e.GetId()]["listbox"] edit = self.ids[e.GetId()]["textctrl"] selection = listbox.GetSelections() if selection: listbox.Delete(selection[0]) def HasChecked(self): """ True if any item is checked. """ result = False for checkbox in self.checks.itervalues(): if checkbox.GetValue(): result = True return result class GroupBox(SQLBox): def __init__(self, parent, id, label): SQLBox.__init__(self, parent, id, label) # create and add the CheckListBox self.sizer = wx.StaticBoxSizer(self, wx.VERTICAL) self.checklist = wx.CheckListBox(parent, wx.ID_ANY, choices=self.GetTitles(), style=wx.LB_SORT) self.sizer.Add(self.checklist, 1, wx.EXPAND) def GetSelection(self): """ Gives a dictionary of selected fields like: {"title": "field name"} """ count = self.checklist.GetCount() checked = {} for index in range(count): if self.checklist.IsChecked(index): key = self.checklist.GetString(index) value = self.titles[key] checked[key] = value return checked def GetString(self, get_all=False): """gives selected columns as SQL GROUP BY for output""" sql_group = "" def is_empty(name): if (name == "empty"): wx.LogError("Missing SQL field translation!") return True return False if get_all: for value in self.sql_fields.itervalues(): if is_empty(value[0]): continue sql_group += value[0] + ", " else: for name in self.GetSelection().itervalues(): if is_empty(name): continue sql_group += name + ", " if sql_group: sql_group = "GROUP BY " + sql_group.rstrip(", ") + " ORDER BY " + self.sql_fields["sales order"][0] return sql_group def HasChecked(self): """ True if any item is checked. """ return (len(self.GetSelection()) > 0) if __name__ == '__main__': App = MainApp(redirect = 0) # redirect stdio to console App.SetAppName("SoftPRO Data Collector") App.MainLoop()
Revision: 5276
Initial Code
Initial URL
Initial Description
Initial Title
Initial Tags
Initial Language
at February 24, 2008 23:00 by stagger
Initial Code
#!/usr/bin/env python # encoding: utf-8 """ collector.py Created by Daniel Link on 2007-12-05. Copyright (c) 2007 Top Trend Manucfacturing Co., Ltd.. All rights reserved. """ import wx import wx.lib.scrolledpanel as scrolled import wx.lib.newevent from wx.lib.evtmgr import eventManager import sys import os import pymssql import softpro import validators import string import csv import time import thread FB_SO = 1 FB_CU = 2 FB_PR = 4 FB_MO = 8 FB_OD = 16 FB_CD = 32 FB_SM = 64 # This creates a new Event class and a EVT binder function (ThreadDone, EVT_THREAD_DONE) = wx.lib.newevent.NewEvent() class MainApp(wx.App): """ Collects various information from SQL server """ def OnInit(self): """ Create main frame and initialize variables """ self.connected = False # SQL server connection status self.verbose = False self.main_frame = MainFrame(None, -1, "SoftPRO data collector") self.nb = self.main_frame.notebook self.thread_id = 1 self.threads = {} # SQL events: eventManager.Register(self.OnConnect, wx.EVT_BUTTON, self.nb.sql_win.connect_button) eventManager.Register(self.OnConnect, wx.EVT_TEXT_ENTER, self.nb.sql_win.host_edit) eventManager.Register(self.OnConnect, wx.EVT_TEXT_ENTER, self.nb.sql_win.user_edit) eventManager.Register(self.OnConnect, wx.EVT_TEXT_ENTER, self.nb.sql_win.password_edit) eventManager.Register(self.OnConnect, wx.EVT_TEXT_ENTER, self.nb.sql_win.database_edit) # Job events: eventManager.Register(self.OnOPCollect, wx.EVT_BUTTON, self.nb.pages["op_win"].collect_button) #eventManager.Register(self.OnCostCollect, wx.EVT_BUTTON, self.nb.pages["costs_win"].collect_button) # Thread events: eventManager.Register(self.OnClose, wx.EVT_CLOSE, self.main_frame) #eventManager.Register(self.OnGaugeUpdate, EVT_GAUGE_UPDATE, self.main_frame) eventManager.Register(self.OnThreadDone, EVT_THREAD_DONE, self.main_frame) self.main_frame.Show(True) return True def OnConnect(self, e): """ Connect / Disconnect from SQL Server """ nb = self.main_frame.notebook sql = self.main_frame.notebook.sql_win host = sql.host_edit.GetValue() user = sql.user_edit.GetValue() password = sql.password_edit.GetValue() database = sql.database_edit.GetValue() if not self.connected: # connect try: self.sql_connection = pymssql.connect(host=host, user=user, password=password, database=database) except: wx.LogError("Connection failed.nnException: " + str(sys.exc_info()[0])) raise self.connected = True for widget in sql.login_widgets: widget.Disable() wx.LogStatus("Connected to %s as %s" % (host, user)) sql.connect_button.SetLabel("Disconnect") for page in nb.pages.itervalues(): page.Enable() else: # disconnect self.sql_connection.close() self.connected = False for widget in sql.login_widgets: widget.Enable() wx.LogStatus("Disconnected %s from %s" % (user, host)) sql.connect_button.SetLabel("Connect") for page in nb.pages.itervalues(): page.Disable() def OnOPCollect(self, e): """ Collect OP strings which pass through filter and write them to disk """ dlg = wx.FileDialog(self.main_frame, message="Save file as ...", defaultDir=os.getcwd(), defaultFile="", wildcard="CSV Table for Excel Import|*.csv", style=wx.SAVE) if dlg.ShowModal() != wx.ID_OK: return op_filter_box = self.nb.op_filter_box cur = self.sql_connection.cursor() # Disable some widgets self.nb.pages["op_win"].collect_button.Disable() op_filter_box.Disable() wx.LogStatus("Searching for product variants...") # get all sales orders that pass through filter name = "od20sord" t = op_filter_box.Translate(name) query = "SELECT %s.fsono" % t #for field in op_filter_box.GetCheckedFields(): # if (field != t + ".fsono"): # query += ", " + op_filter_box.MakeShort(field) query += " FROM %s %s WHERE %s.fitemno1 LIKE ' 1' " % (name, t, t) if not op_filter_box.HasChecked(): wx.LogWarning("You have chosen not to filter output. The operation will probably take a very long time.") else: query += op_filter_box.GetString() query += " GROUP BY %s.fsono" % t wx.LogVerbose("Executing Query: %s" % query) try: cur.execute(query) except: message = "SQL Query failed. " if wx.Log.GetVerbose(): message += "Check the log for details. Does the query correctly execute when you run it manually with Microsoft SQL Enterprise Manager?" else: message += "Please enable verbose output on the logging page and repeat what you just did." wx.LogError(message + "nnException: " + str(sys.exc_info()[0])) wx.LogStatus("") raise results = cur.fetchall() if not results: wx.LogWarning("Your query was too restrictive. The server did not return any results. Sorry.") return codes = [] for row in results: codes.append(row[0]) wx.LogStatus("Finding OP codes for %d Sales Orders..." % len(codes)) wx.LogVerbose("Get some coffee! Seriously!") # Start thread here assert not self.threads.has_key("op"), "The OP thread seems to be running already." self.threads["op"] = OPCollectThread(self.main_frame, self.thread_id, cur, self.nb.op_gauge, self.nb.op_remain, codes, dlg.GetPath()+".csv") self.threads["op"].Start() button = self.nb.pages["op_win"].collect_button eventManager.DeregisterWindow(button) eventManager.Register(self.OnOPAbort, wx.EVT_BUTTON, button) button.SetLabel("Abort") button.Enable() self.thread_id += 1 def OnCostCollect(self, e): """Gives all the costs which pass through the filter""" pass def OnClose(self, e): """ Stop running threads on window close """ if len(self.threads) > 0: busy = wx.BusyInfo("Shutting down pending processes...") for t in self.threads.itervalues(): t.Stop() running = 1 while running: running = 0 for t in self.threads.itervalues(): running = running + t.IsRunning() time.sleep(0.1) # Pass event to the next handler (and, thus, close the window) e.Skip() def OnThreadDone(self, e): t = e.type assert self.threads.has_key(t), "There is no %s thread!" % t del self.threads[t] button = self.nb.pages[t+"_win"].collect_button eventManager.DeregisterWindow(button) eventManager.Register(self.OnOPCollect, wx.EVT_BUTTON, button) button.SetLabel("Collect") def OnOPAbort(self, e): assert self.threads.has_key("op"), "The process can not be aborted, because it is not running!" self.threads["op"].Stop() wx.LogStatus("OP Collection aborted.") class BasicThread: def __init__(self, parent, id, cur, gauge, remain, filename): """ Initialize Thread parent: where EVT_GAUGE_UPDATE shall be sent cur: pymssql SQL connection cursor gauge: wx.Gauge remain: wx.StaticText for remaining time filename: where to save the results Derived classes _must_ set self.total in their __init__! """ self.parent = parent self.cur = cur self.gauge = gauge self.remain = remain self.filename = filename self.running = False self.keepgoing = False self.progress = 0 def Start(self): self.running = True self.keepgoing = True self.start_time = time.time() wx.LogVerbose("Starting Thread...") thread.start_new_thread(self.Run, ()) def Stop(self): self.keepgoing = False def IsRunning(self): return self.running def Run(self): self.running = False class OPCollectThread(BasicThread): def __init__(self, parent, id, cur, gauge, remain, sorders, filename): """ Initialize Thread sorders: list of sales order codes """ BasicThread.__init__(self, parent, id, cur, gauge, remain, filename) self.sorders = sorders self.total = len(sorders) wx.CallAfter(gauge.SetRange, self.total) self.opdic = {} def Run(self): # collect OP codes for current sales order: while self.keepgoing and (self.progress < self.total): code = self.sorders[self.progress] # code of current sales order sorder = softpro.SalesOrder(self.cur, code) for soitem in sorder.items: bom = soitem.bom if bom: opstring = soitem.get_opstring() qty = int(soitem.bom.quantity) if opstring: if not self.opdic.has_key(opstring): self.opdic[opstring] = qty else: self.opdic[opstring] += qty wx.LogVerbose("%s:%s (%6d) => %s" % (code, soitem.number.lstrip(), soitem.bom.quantity, opstring)) self.progress += 1 # Update Gauge wx.CallAfter(self.gauge.SetValue, self.progress) # Update StaticText with remaining time elapsed = time.time() - self.start_time per_so = elapsed/self.progress remain = per_so*(self.total-self.progress) text = time.strftime("%H:%M:%S", time.gmtime(remain)) wx.CallAfter(self.remain.SetLabel, text) # if all sales orders have been processed, write them to disk: if (self.progress == self.total): wx.LogVerbose("%d product variants could be identified." % len(self.opdic)) writer = csv.writer(file(self.filename, "w"), dialect="excel", delimiter=";") position = 0 wx.LogVerbose("Writing %s rows to " % self.filename) for op, quantity in self.opdic.items(): position += 1 writer.writerow([op, quantity]) if (position % 100 == 0): wx.LogVerbose("%d rows written" % position) wx.CallAfter(wx.LogStatus, "Wrote %d product variants to %s" % (position, self.filename)) # reset Gauge wx.CallAfter(self.gauge.SetValue, 0) # signal thread stop self.running = False evt = ThreadDone(type="op") wx.PostEvent(self.parent, evt) class MainFrame(wx.Frame): def __init__(self,parent,id,title): wx.Frame.__init__(self, parent, wx.ID_ANY, title, size=(800,600), style=wx.DEFAULT_FRAME_STYLE) self.CreateStatusBar() sizer = wx.BoxSizer(wx.VERTICAL) self.notebook = notebook = MainNotebook(self, wx.ID_ANY) sizer.Add(notebook, 1, wx.EXPAND) self.SetSizer(sizer) class MainNotebook(wx.Notebook): def __init__(self, parent, id): """ Add pages and disable all except SQL Connection """ wx.Notebook.__init__(self, parent, id) self.pages = {} # for easy disabling of collector pages self.AddGeneralPage() self.AddOPPage() #self.AddCostsPage() for page in self.pages.itervalues(): page.Disable() def AddGeneralPage(self): """ Add Page for SQL Connection """ row = 1 self.sql_win = win = wx.Panel(self, wx.ID_ANY) sizer = wx.BoxSizer(wx.VERTICAL) win.login_widgets = [] login_box = wx.StaticBox(win, wx.ID_ANY, "SQL Connection") login_box_sizer = wx.StaticBoxSizer(login_box, wx.VERTICAL) login_sizer = wx.FlexGridSizer(cols=2, vgap=10, hgap=10) login_sizer.AddGrowableCol(1) # Host text = wx.StaticText(win, wx.ID_ANY, "Host:") win.login_widgets.append(text) login_sizer.Add(text, 0) sql_host = "server ip" if (sys.platform == "darwin"): # Mac needs port number too sql_host += ":1433" win.host_edit = edit = wx.TextCtrl(win, wx.ID_ANY, sql_host, validator=validators.HostValidator(), style=wx.TE_PROCESS_ENTER) win.login_widgets.append(edit) login_sizer.Add(edit, 1, wx.EXPAND) row += 1 # User text = wx.StaticText(win, wx.ID_ANY, "User:") win.login_widgets.append(text) login_sizer.Add(text, 0) win.user_edit = edit = wx.TextCtrl(win, wx.ID_ANY, "user name", style=wx.TE_PROCESS_ENTER) win.login_widgets.append(edit) login_sizer.Add(edit, 1, wx.EXPAND) row += 1 # Password text = wx.StaticText(win, wx.ID_ANY, "Password:") win.login_widgets.append(text) login_sizer.Add(text, 0) win.password_edit = edit = wx.TextCtrl(win, wx.ID_ANY, style=wx.TE_PASSWORD | wx.TE_PROCESS_ENTER) edit.SetFocus() win.login_widgets.append(edit) login_sizer.Add(edit, 1, wx.EXPAND) row += 1 # Database text = wx.StaticText(win, wx.ID_ANY, "Database:") win.login_widgets.append(text) login_sizer.Add(text, 0) win.database_edit = edit = wx.TextCtrl(win, wx.ID_ANY, "TTM", style=wx.TE_PROCESS_ENTER) win.login_widgets.append(edit) login_sizer.Add(edit, 1, wx.EXPAND) row += 1 # Events eventManager.Register(self.OnSQLChange, wx.EVT_TEXT, win) # Put in supersizer login_box_sizer.Add(login_sizer, 0, wx.EXPAND) login_box_sizer.AddSpacer((1,10)) # Connect Button win.connect_button = button = wx.Button(win, wx.ID_ANY, "Connect") login_box_sizer.Add(button, 0, wx.EXPAND) # Add box sizer to window sizer sizer.Add(login_box_sizer, 0, wx.EXPAND) log_box = wx.StaticBox(win, wx.ID_ANY, "Logging") log_box_sizer = wx.StaticBoxSizer(log_box, wx.VERTICAL) # Toggle Verbose Mode self.check_verbose = check = wx.CheckBox(win, wx.ID_ANY, "Verbose Mode") eventManager.Register(self.OnVerboseCheck, wx.EVT_CHECKBOX, check) log_box_sizer.Add(check, 0) sizer.Add(log_box_sizer, 0, wx.EXPAND) win.SetSizer(sizer) self.AddPage(win, "General") def AddOPPage(self): """ Add Page for OP Sequence """ self.pages["op_win"] = win = scrolled.ScrolledPanel(self, wx.ID_ANY) sizer = wx.BoxSizer(wx.VERTICAL) """ # Output options self.op_group_box = box = GroupBox(win, wx.ID_ANY, "Output Fields") sizer.Add(box.sizer, 0, wx.EXPAND) """ # Filter self.op_filter_box = box = FilterBox(win, wx.ID_ANY, "Filter by", style=FB_SO|FB_OD|FB_CD) sizer.Add(box.sizer, 0, wx.EXPAND) # Gauge subsizer = wx.BoxSizer(wx.HORIZONTAL) self.op_gauge = gauge = wx.Gauge(win, wx.ID_ANY) subsizer.Add(gauge, 1) subsizer.AddSpacer((5,1)) self.op_remain = text = wx.StaticText(win, wx.ID_ANY, "00:00:00") subsizer.Add(text, 0) subsizer.AddSpacer((5,1)) sizer.Add(subsizer, 0, wx.EXPAND) # Collect Button win.collect_button = button = wx.Button(win, wx.ID_ANY, "Collect") sizer.Add(button, 0, wx.EXPAND) """ # Test Button button = wx.Button(win, wx.ID_ANY, "Print SQL Filter and Group string") eventManager.Register(self.OnTestButton, wx.EVT_BUTTON, button) sizer.Add(button, 0, wx.EXPAND) """ win.SetSizer(sizer) win.SetupScrolling() self.AddPage(win, "OP Sequence") def AddCostsPage(self): """ Add Page for Actual Costs """ self.pages["costs_win"] = win = scrolled.ScrolledPanel(self, wx.ID_ANY) sizer = wx.BoxSizer(wx.VERTICAL) # Output options self.costs_group_box = box = GroupBox(win, wx.ID_ANY, "Output Fields") sizer.Add(box.sizer, 0, wx.EXPAND) # Filter self.costs_filter_box = box = FilterBox(win, wx.ID_ANY, "Filter by") sizer.Add(box.sizer, 0, wx.EXPAND) # Gauge self.costs_gauge = gauge = wx.Gauge(win, wx.ID_ANY) sizer.Add(gauge, 0, wx.EXPAND) # Collect Button win.collect_button = button = wx.Button(win, wx.ID_ANY, "Collect") sizer.Add(button, 0, wx.EXPAND) win.SetSizer(sizer) win.SetupScrolling() self.AddPage(win, "Actual Costs") def OnSQLChange(self, event): """ Disable Connect button when required edit field is empty """ if event.GetString() == "": self.connect_button.Disable() else: self.connect_button.Enable() def OnVerboseCheck(self, event): wx.Log.SetVerbose(self.check_verbose.GetValue()) def OnTestButton(self, event): pass class SQLBox(wx.StaticBox): def __init__(self, parent, id, label): wx.StaticBox.__init__(self, parent, id, label) self.row = 0 # "internal name": ("name on server", "output title") self.sql_fields = {"sales order": ("od20sord.fsono", "Sales Order"), "so item": ("od20sord.fitemno1", "Sales Order Item"), "work order": ("pd20woi1.forderno", "Work Order"), "op seq": ("bd01stb1.fopseq", "OP Sequence"), "op code": ("bd01stb1.fopcode", "OP Code"), "comp code": ("bd01stb2.fcompcode", "Component Code"), "comp qty": ("bd01stb2.fqty", "Component Quantity"), "customer": ("empty", "Customer ID"), "product": ("od20sord.fpdcode", "Product Code"), "mould": ("empty", "Mould Code"), "order date": ("od20sord.fsodate", "Order Date"), "commit date": ("od20sord.fddate", "Commit Date"), "order qty": ("od20sord.fqty", "Order Quantity"), "plan qty": ("pd20woi1.fwoqty", "Plan Quantity"), "salesman": ("empty", "Salesman ID")} # building dict like: {"Product Code":"od20sord.fpdcode"} self.titles = {} for field in self.sql_fields.iterkeys(): key = self.sql_fields[field][1] value = field self.titles[key] = value # building dict like: {"od20sord.fpdcode":"product"} self.names = {} for field in self.sql_fields.iterkeys(): key = self.sql_fields[field][0] value = field self.names[key] = value # building table translator dictionary likke: {"od20sord": "t1"} self.translator = {} index = 1 for name in self.names.iterkeys(): if self.translator.has_key(name): continue table = name.rstrip(string.letters).replace(".", "") self.translator[table] = "t%d" % index index += 1 def GetTitles(self): """ Gives a simple list of field descriptions """ field_names = [] for title in self.titles.iterkeys(): field_names.append(title) return field_names def Translate(self, table): """ Give table's short identifier """ assert self.translator.has_key(table), "No dictionary entry for %s!" % table return self.translator[table] def MakeShort(self, field): """ Replaces table in table.field with short identifier """ alphanum = string.letters + string.digits table = field.rstrip(alphanum).replace(".", "") table = self.Translate(table) field = field.lstrip(alphanum) complete = table + field assert complete, "%s does not have the expected format "table.field"" return complete class FilterBox(SQLBox): def __init__(self, parent, id, label, style=FB_SO|FB_CU|FB_PR|FB_MO|FB_OD|FB_CD|FB_SM): SQLBox.__init__(self, parent, id, label) self.ranges = {} # {name+"_min": textctrl, name+"_max": textctrl} self.checks = {} # {name: checkbox} self.selects = {} # {name+"_sel": listbox} self.ids = {} # {button_id: listbox/editctrl} self.id = 100 self.sizer = wx.StaticBoxSizer(self, wx.VERTICAL) self.subsizer = wx.GridBagSizer(vgap=10, hgap=10) self.style = style def AddRangeLine(linetype, name): self.ids[self.id] = {} self.ids[self.id+1] = {} parent = self.GetParent() title = self.sql_fields[name][1] if linetype == "text": self.ids[self.id]["textctrl"] = self.ranges[name+"_min"] = wx.TextCtrl(parent, wx.ID_ANY) self.ids[self.id]["textctrl"] = self.ranges[name+"_max"] = wx.TextCtrl(parent, wx.ID_ANY) elif linetype == "date": self.ids[self.id]["datepickerctrl"] = self.ranges[name+"_min"] = wx.DatePickerCtrl(parent, wx.ID_ANY, style = wx.DP_DEFAULT | wx.DP_SHOWCENTURY) self.ids[self.id+1]["datepickerctrl"] = self.ranges[name+"_max"] = wx.DatePickerCtrl(parent, wx.ID_ANY, style = wx.DP_DEFAULT | wx.DP_SHOWCENTURY) else: raise "Unknown type of range line: " + linetype self.checks[name] = check = wx.CheckBox(parent, wx.ID_ANY, title) self.subsizer.Add(check, (self.row,0)) self.subsizer.Add(wx.StaticText(parent, wx.ID_ANY, "min:"), (self.row,1)) self.subsizer.Add(self.ranges[name+"_min"], (self.row,2)) self.subsizer.Add(wx.StaticText(parent, wx.ID_ANY, "max:"), (self.row,3)) self.subsizer.Add(self.ranges[name+"_max"], (self.row,4)) self.id += 2 self.row += 1 def AddSelectLine(name): self.ids[self.id] = {} parent = self.GetParent() title = "Certain %ss" % self.sql_fields[name][1] self.checks[name+"_sel"] = check = wx.CheckBox(parent, wx.ID_ANY, title) self.subsizer.Add(check, (self.row,0)) self.ids[self.id]["textctrl"] = self.selects[name+"_sel"] = wx.TextCtrl(parent, wx.ID_ANY) self.subsizer.Add(self.selects[name+"_sel"], (self.row,2)) button = wx.Button(parent, self.id, "Add") eventManager.Register(self.OnListAdd, wx.EVT_BUTTON, button) self.subsizer.Add(button, (self.row,3)) self.ids[self.id]["listbox"] = self.selects[name] = wx.ListBox(parent, wx.ID_ANY, style=wx.LB_SINGLE | wx.LB_NEEDED_SB | wx.LB_SORT) self.subsizer.Add(self.selects[name], (self.row,4)) button = wx.Button(parent, self.id, "Remove") eventManager.Register(self.OnListDel, wx.EVT_BUTTON, button) self.subsizer.Add(button, (self.row,5)) self.id += 1 self.row += 1 if style & FB_SO: AddRangeLine("text", "sales order") if style & FB_CU: AddRangeLine("text", "customer") AddSelectLine("customer") if style & FB_PR: AddRangeLine("text", "product") AddSelectLine("product") if style & FB_MO: AddRangeLine("text", "mould") AddSelectLine("mould") if style & FB_OD: AddRangeLine("date", "order date") if style & FB_CD: AddRangeLine("date", "commit date") if style & FB_SM: AddRangeLine("text", "salesman") self.sizer.Add(self.subsizer, 0, wx.EXPAND) def GetString(self): """gives selected filters as SQL conditions""" sql_filter = "" def from_select(name): """Give SQL condition from listbox items""" if not self.checks[name+"_sel"].GetValue(): return "" sel = self.selects[name+"_sel"] count = sel.GetCount() items = [] sql_filter = "" field = self.sql_fields[name][0] field = self.MakeShort(field) for pos in range(count): string = sel.GetString(pos) items.append(string) for item in items: sql_filter += "OR %s LIKE %s " % (field, item) return sql_filter def from_range(fieldtype, name): "Give SQL range condition from TextCtrl (edit) fields" if not self.checks[name].GetValue(): return "" tmp = "" field = self.sql_fields[name][0] field = self.MakeShort(field) min_val = self.ranges[name+"_min"].GetValue() if min_val: if fieldtype == "date": tmp += "AND DATEPART(dd, %s) >= %s " % (field, min_val.GetDay()) tmp += "AND DATEPART(mm, %s) >= %s " % (field, min_val.GetMonth()) tmp += "AND DATEPART(yyyy, %s) >= %s " % (field, min_val.GetYear()) else: tmp += "AND %s >= %s " % (field, min_val) max_val = self.ranges[name+"_max"].GetValue() if max_val: if fieldtype == "date": tmp += "AND DATEPART(dd, %s) <= %s " % (field, max_val.GetDay()) tmp += "AND DATEPART(mm, %s) <= %s " % (field, max_val.GetMonth()) tmp += "AND DATEPART(yyyy, %s) <= %s " % (field, max_val.GetYear()) else: tmp += "AND %s <= %s " % (field, max_val) return tmp if self.style & FB_SO: sql_filter += from_range("text", "sales order") if self.style & FB_CU: sql_filter += from_range("text", "customer") sql_filter += from_select("customer") if self.style & FB_PR: sql_filter += from_range("text", "product") sql_filter += from_select("product") if self.style & FB_MO: sql_filter += from_range("text", "mould") sql_filter += from_select("mould") if self.style & FB_OD: sql_filter += from_range("date", "order date") if self.style & FB_CD: sql_filter += from_range("date", "commit date") if self.style & FB_SM: sql_filter += from_range("text", "salesman") return sql_filter def GetCheckedFields(self): """ Gives a list of all checked SQL field names. """ fields = [] for name, check in self.checks.items(): field = self.sql_fields[name.replace("_sel", "")][0] if check.GetValue() and (field != "empty"): fields.append(field) return fields def OnListAdd(self, e): """Add a value from an edit field to a listbox""" listbox = self.ids[e.GetId()]["listbox"] edit = self.ids[e.GetId()]["textctrl"] value = edit.GetValue() if value: listbox.Append(value) def OnListDel(self, e): """Delete the selected value from a listbox""" listbox = self.ids[e.GetId()]["listbox"] edit = self.ids[e.GetId()]["textctrl"] selection = listbox.GetSelections() if selection: listbox.Delete(selection[0]) def HasChecked(self): """ True if any item is checked. """ result = False for checkbox in self.checks.itervalues(): if checkbox.GetValue(): result = True return result class GroupBox(SQLBox): def __init__(self, parent, id, label): SQLBox.__init__(self, parent, id, label) # create and add the CheckListBox self.sizer = wx.StaticBoxSizer(self, wx.VERTICAL) self.checklist = wx.CheckListBox(parent, wx.ID_ANY, choices=self.GetTitles(), style=wx.LB_SORT) self.sizer.Add(self.checklist, 1, wx.EXPAND) def GetSelection(self): """ Gives a dictionary of selected fields like: {"title": "field name"} """ count = self.checklist.GetCount() checked = {} for index in range(count): if self.checklist.IsChecked(index): key = self.checklist.GetString(index) value = self.titles[key] checked[key] = value return checked def GetString(self, get_all=False): """gives selected columns as SQL GROUP BY for output""" sql_group = "" def is_empty(name): if (name == "empty"): wx.LogError("Missing SQL field translation!") return True return False if get_all: for value in self.sql_fields.itervalues(): if is_empty(value[0]): continue sql_group += value[0] + ", " else: for name in self.GetSelection().itervalues(): if is_empty(name): continue sql_group += name + ", " if sql_group: sql_group = "GROUP BY " + sql_group.rstrip(", ") + " ORDER BY " + self.sql_fields["sales order"][0] return sql_group def HasChecked(self): """ True if any item is checked. """ return (len(self.GetSelection()) > 0) if __name__ == '__main__': App = MainApp(redirect = 0) # redirect stdio to console App.SetAppName("SoftPRO Data Collector") App.MainLoop()
Initial URL
Initial Description
This is the main file. You also need the libraries posted here for the program to work.
Initial Title
GUI Collector - Main
Initial Tags
sql, textmate, python
Initial Language
Python