Revision: 5277
Updated Code
at February 24, 2008 23:04 by stagger
Updated Code
#!/usr/bin/env python
# encoding: utf-8
"""
collector.py
Created by Daniel Link on 2007-12-05.
Copyright (c) 2007 Top Trend Manucfacturing Co., Ltd.. All rights reserved.
"""
import wx
import wx.lib.scrolledpanel as scrolled
import wx.lib.newevent
from wx.lib.evtmgr import eventManager
import sys
import os
import pymssql
import softpro
import validators
import string
import csv
import time
import thread
FB_SO = 1
FB_CU = 2
FB_PR = 4
FB_MO = 8
FB_OD = 16
FB_CD = 32
FB_SM = 64
# This creates a new Event class and a EVT binder function
(ThreadDone, EVT_THREAD_DONE) = wx.lib.newevent.NewEvent()
class MainApp(wx.App):
""" Collects various information from SQL server """
def OnInit(self):
""" Create main frame and initialize variables """
self.connected = False # SQL server connection status
self.verbose = False
self.main_frame = MainFrame(None, -1, "SoftPRO data collector")
self.nb = self.main_frame.notebook
self.thread_id = 1
self.threads = {}
# SQL events:
eventManager.Register(self.OnConnect, wx.EVT_BUTTON, self.nb.sql_win.connect_button)
eventManager.Register(self.OnConnect, wx.EVT_TEXT_ENTER, self.nb.sql_win.host_edit)
eventManager.Register(self.OnConnect, wx.EVT_TEXT_ENTER, self.nb.sql_win.user_edit)
eventManager.Register(self.OnConnect, wx.EVT_TEXT_ENTER, self.nb.sql_win.password_edit)
eventManager.Register(self.OnConnect, wx.EVT_TEXT_ENTER, self.nb.sql_win.database_edit)
# Job events:
eventManager.Register(self.OnOPCollect, wx.EVT_BUTTON, self.nb.pages["op_win"].collect_button)
#eventManager.Register(self.OnCostCollect, wx.EVT_BUTTON, self.nb.pages["costs_win"].collect_button)
# Thread events:
eventManager.Register(self.OnClose, wx.EVT_CLOSE, self.main_frame)
#eventManager.Register(self.OnGaugeUpdate, EVT_GAUGE_UPDATE, self.main_frame)
eventManager.Register(self.OnThreadDone, EVT_THREAD_DONE, self.main_frame)
self.main_frame.Show(True)
return True
def OnConnect(self, e):
""" Connect / Disconnect from SQL Server """
nb = self.main_frame.notebook
sql = self.main_frame.notebook.sql_win
host = sql.host_edit.GetValue()
user = sql.user_edit.GetValue()
password = sql.password_edit.GetValue()
database = sql.database_edit.GetValue()
if not self.connected: # connect
try:
self.sql_connection = pymssql.connect(host=host, user=user, password=password, database=database)
except:
wx.LogError("Connection failed.nnException: " + str(sys.exc_info()[0]))
raise
self.connected = True
for widget in sql.login_widgets:
widget.Disable()
wx.LogStatus("Connected to %s as %s" % (host, user))
sql.connect_button.SetLabel("Disconnect")
for page in nb.pages.itervalues():
page.Enable()
else: # disconnect
self.sql_connection.close()
self.connected = False
for widget in sql.login_widgets:
widget.Enable()
wx.LogStatus("Disconnected %s from %s" % (user, host))
sql.connect_button.SetLabel("Connect")
for page in nb.pages.itervalues():
page.Disable()
def OnOPCollect(self, e):
""" Collect OP strings which pass through filter and write them to disk """
dlg = wx.FileDialog(self.main_frame, message="Save file as ...",
defaultDir=os.getcwd(),
defaultFile="",
wildcard="CSV Table for Excel Import|*.csv",
style=wx.SAVE)
if dlg.ShowModal() != wx.ID_OK:
return
op_filter_box = self.nb.op_filter_box
cur = self.sql_connection.cursor()
# Disable some widgets
self.nb.pages["op_win"].collect_button.Disable()
op_filter_box.Disable()
wx.LogStatus("Searching for product variants...")
# get all sales orders that pass through filter
name = "od20sord"
t = op_filter_box.Translate(name)
query = "SELECT %s.fsono" % t
#for field in op_filter_box.GetCheckedFields():
# if (field != t + ".fsono"):
# query += ", " + op_filter_box.MakeShort(field)
query += " FROM %s %s WHERE %s.fitemno1 LIKE ' 1' " % (name, t, t)
if not op_filter_box.HasChecked():
wx.LogWarning("You have chosen not to filter output. The operation will probably take a very long time.")
else:
query += op_filter_box.GetString()
query += " GROUP BY %s.fsono" % t
wx.LogVerbose("Executing Query: %s" % query)
try:
cur.execute(query)
except:
message = "SQL Query failed. "
if wx.Log.GetVerbose():
message += "Check the log for details. Does the query correctly execute when you run it manually with Microsoft SQL Enterprise Manager?"
else:
message += "Please enable verbose output on the logging page and repeat what you just did."
wx.LogError(message + "nnException: " + str(sys.exc_info()[0]))
wx.LogStatus("")
raise
results = cur.fetchall()
if not results:
wx.LogWarning("Your query was too restrictive. The server did not return any results. Sorry.")
return
codes = []
for row in results:
codes.append(row[0])
wx.LogStatus("Finding OP codes for %d Sales Orders..." % len(codes))
wx.LogVerbose("Get some coffee! Seriously!")
# Start thread here
assert not self.threads.has_key("op"), "The OP thread seems to be running already."
self.threads["op"] = OPCollectThread(self.main_frame, self.thread_id, cur, self.nb.op_gauge, self.nb.op_remain, codes, dlg.GetPath()+".csv")
self.threads["op"].Start()
button = self.nb.pages["op_win"].collect_button
eventManager.DeregisterWindow(button)
eventManager.Register(self.OnOPAbort, wx.EVT_BUTTON, button)
button.SetLabel("Abort")
button.Enable()
self.thread_id += 1
def OnCostCollect(self, e):
"""Gives all the costs which pass through the filter"""
pass
def OnClose(self, e):
""" Stop running threads on window close """
if len(self.threads) > 0:
busy = wx.BusyInfo("Shutting down pending processes...")
for t in self.threads.itervalues():
t.Stop()
running = 1
while running:
running = 0
for t in self.threads.itervalues():
running = running + t.IsRunning()
time.sleep(0.1)
# Pass event to the next handler (and, thus, close the window)
e.Skip()
def OnThreadDone(self, e):
t = e.type
assert self.threads.has_key(t), "There is no %s thread!" % t
del self.threads[t]
button = self.nb.pages[t+"_win"].collect_button
eventManager.DeregisterWindow(button)
eventManager.Register(self.OnOPCollect, wx.EVT_BUTTON, button)
button.SetLabel("Collect")
def OnOPAbort(self, e):
assert self.threads.has_key("op"), "The process can not be aborted, because it is not running!"
self.threads["op"].Stop()
wx.LogStatus("OP Collection aborted.")
class BasicThread:
def __init__(self, parent, id, cur, gauge, remain, filename):
""" Initialize Thread
parent: where EVT_GAUGE_UPDATE shall be sent
cur: pymssql SQL connection cursor
gauge: wx.Gauge
remain: wx.StaticText for remaining time
filename: where to save the results
Derived classes _must_ set self.total in their __init__!
"""
self.parent = parent
self.cur = cur
self.gauge = gauge
self.remain = remain
self.filename = filename
self.running = False
self.keepgoing = False
self.progress = 0
def Start(self):
self.running = True
self.keepgoing = True
self.start_time = time.time()
wx.LogVerbose("Starting Thread...")
thread.start_new_thread(self.Run, ())
def Stop(self):
self.keepgoing = False
def IsRunning(self):
return self.running
def Run(self):
self.running = False
class OPCollectThread(BasicThread):
def __init__(self, parent, id, cur, gauge, remain, sorders, filename):
""" Initialize Thread
sorders: list of sales order codes
"""
BasicThread.__init__(self, parent, id, cur, gauge, remain, filename)
self.sorders = sorders
self.total = len(sorders)
wx.CallAfter(gauge.SetRange, self.total)
self.opdic = {}
def Run(self):
# collect OP codes for current sales order:
while self.keepgoing and (self.progress < self.total):
code = self.sorders[self.progress] # code of current sales order
sorder = softpro.SalesOrder(self.cur, code)
for soitem in sorder.items:
bom = soitem.bom
if bom:
opstring = soitem.get_opstring()
qty = int(soitem.bom.quantity)
if opstring:
if not self.opdic.has_key(opstring):
self.opdic[opstring] = qty
else:
self.opdic[opstring] += qty
wx.LogVerbose("%s:%s (%6d) => %s" % (code, soitem.number.lstrip(), soitem.bom.quantity, opstring))
self.progress += 1
# Update Gauge
wx.CallAfter(self.gauge.SetValue, self.progress)
# Update StaticText with remaining time
elapsed = time.time() - self.start_time
per_so = elapsed/self.progress
remain = per_so*(self.total-self.progress)
text = time.strftime("%H:%M:%S", time.gmtime(remain))
wx.CallAfter(self.remain.SetLabel, text)
# if all sales orders have been processed, write them to disk:
if (self.progress == self.total):
wx.LogVerbose("%d product variants could be identified." % len(self.opdic))
writer = csv.writer(file(self.filename, "w"), dialect="excel", delimiter=";")
position = 0
wx.LogVerbose("Writing %s rows to " % self.filename)
for op, quantity in self.opdic.items():
position += 1
writer.writerow([op, quantity])
if (position % 100 == 0):
wx.LogVerbose("%d rows written" % position)
wx.CallAfter(wx.LogStatus, "Wrote %d product variants to %s" % (position, self.filename))
# reset Gauge
wx.CallAfter(self.gauge.SetValue, 0)
# signal thread stop
self.running = False
evt = ThreadDone(type="op")
wx.PostEvent(self.parent, evt)
class MainFrame(wx.Frame):
def __init__(self,parent,id,title):
wx.Frame.__init__(self, parent, wx.ID_ANY, title, size=(800,600), style=wx.DEFAULT_FRAME_STYLE)
self.CreateStatusBar()
sizer = wx.BoxSizer(wx.VERTICAL)
self.notebook = notebook = MainNotebook(self, wx.ID_ANY)
sizer.Add(notebook, 1, wx.EXPAND)
self.SetSizer(sizer)
class MainNotebook(wx.Notebook):
def __init__(self, parent, id):
""" Add pages and disable all except SQL Connection """
wx.Notebook.__init__(self, parent, id)
self.pages = {} # for easy disabling of collector pages
self.AddGeneralPage()
self.AddOPPage()
#self.AddCostsPage()
for page in self.pages.itervalues():
page.Disable()
def AddGeneralPage(self):
""" Add Page for SQL Connection """
row = 1
self.sql_win = win = wx.Panel(self, wx.ID_ANY)
sizer = wx.BoxSizer(wx.VERTICAL)
win.login_widgets = []
login_box = wx.StaticBox(win, wx.ID_ANY, "SQL Connection")
login_box_sizer = wx.StaticBoxSizer(login_box, wx.VERTICAL)
login_sizer = wx.FlexGridSizer(cols=2, vgap=10, hgap=10)
login_sizer.AddGrowableCol(1)
# Host
text = wx.StaticText(win, wx.ID_ANY, "Host:")
win.login_widgets.append(text)
login_sizer.Add(text, 0)
sql_host = "server ip"
if (sys.platform == "darwin"): # Mac needs port number too
sql_host += ":1433"
win.host_edit = edit = wx.TextCtrl(win, wx.ID_ANY, sql_host, validator=validators.HostValidator(), style=wx.TE_PROCESS_ENTER)
win.login_widgets.append(edit)
login_sizer.Add(edit, 1, wx.EXPAND)
row += 1
# User
text = wx.StaticText(win, wx.ID_ANY, "User:")
win.login_widgets.append(text)
login_sizer.Add(text, 0)
win.user_edit = edit = wx.TextCtrl(win, wx.ID_ANY, "user name", style=wx.TE_PROCESS_ENTER)
win.login_widgets.append(edit)
login_sizer.Add(edit, 1, wx.EXPAND)
row += 1
# Password
text = wx.StaticText(win, wx.ID_ANY, "Password:")
win.login_widgets.append(text)
login_sizer.Add(text, 0)
win.password_edit = edit = wx.TextCtrl(win, wx.ID_ANY, style=wx.TE_PASSWORD | wx.TE_PROCESS_ENTER)
edit.SetFocus()
win.login_widgets.append(edit)
login_sizer.Add(edit, 1, wx.EXPAND)
row += 1
# Database
text = wx.StaticText(win, wx.ID_ANY, "Database:")
win.login_widgets.append(text)
login_sizer.Add(text, 0)
win.database_edit = edit = wx.TextCtrl(win, wx.ID_ANY, "TTM", style=wx.TE_PROCESS_ENTER)
win.login_widgets.append(edit)
login_sizer.Add(edit, 1, wx.EXPAND)
row += 1
# Events
eventManager.Register(self.OnSQLChange, wx.EVT_TEXT, win)
# Put in supersizer
login_box_sizer.Add(login_sizer, 0, wx.EXPAND)
login_box_sizer.AddSpacer((1,10))
# Connect Button
win.connect_button = button = wx.Button(win, wx.ID_ANY, "Connect")
login_box_sizer.Add(button, 0, wx.EXPAND)
# Add box sizer to window sizer
sizer.Add(login_box_sizer, 0, wx.EXPAND)
log_box = wx.StaticBox(win, wx.ID_ANY, "Logging")
log_box_sizer = wx.StaticBoxSizer(log_box, wx.VERTICAL)
# Toggle Verbose Mode
self.check_verbose = check = wx.CheckBox(win, wx.ID_ANY, "Verbose Mode")
eventManager.Register(self.OnVerboseCheck, wx.EVT_CHECKBOX, check)
log_box_sizer.Add(check, 0)
sizer.Add(log_box_sizer, 0, wx.EXPAND)
win.SetSizer(sizer)
self.AddPage(win, "General")
def AddOPPage(self):
""" Add Page for OP Sequence """
self.pages["op_win"] = win = scrolled.ScrolledPanel(self, wx.ID_ANY)
sizer = wx.BoxSizer(wx.VERTICAL)
"""
# Output options
self.op_group_box = box = GroupBox(win, wx.ID_ANY, "Output Fields")
sizer.Add(box.sizer, 0, wx.EXPAND)
"""
# Filter
self.op_filter_box = box = FilterBox(win, wx.ID_ANY, "Filter by", style=FB_SO|FB_OD|FB_CD)
sizer.Add(box.sizer, 0, wx.EXPAND)
# Gauge
subsizer = wx.BoxSizer(wx.HORIZONTAL)
self.op_gauge = gauge = wx.Gauge(win, wx.ID_ANY)
subsizer.Add(gauge, 1)
subsizer.AddSpacer((5,1))
self.op_remain = text = wx.StaticText(win, wx.ID_ANY, "00:00:00")
subsizer.Add(text, 0)
subsizer.AddSpacer((5,1))
sizer.Add(subsizer, 0, wx.EXPAND)
# Collect Button
win.collect_button = button = wx.Button(win, wx.ID_ANY, "Collect")
sizer.Add(button, 0, wx.EXPAND)
"""
# Test Button
button = wx.Button(win, wx.ID_ANY, "Print SQL Filter and Group string")
eventManager.Register(self.OnTestButton, wx.EVT_BUTTON, button)
sizer.Add(button, 0, wx.EXPAND)
"""
win.SetSizer(sizer)
win.SetupScrolling()
self.AddPage(win, "OP Sequence")
def AddCostsPage(self):
""" Add Page for Actual Costs """
self.pages["costs_win"] = win = scrolled.ScrolledPanel(self, wx.ID_ANY)
sizer = wx.BoxSizer(wx.VERTICAL)
# Output options
self.costs_group_box = box = GroupBox(win, wx.ID_ANY, "Output Fields")
sizer.Add(box.sizer, 0, wx.EXPAND)
# Filter
self.costs_filter_box = box = FilterBox(win, wx.ID_ANY, "Filter by")
sizer.Add(box.sizer, 0, wx.EXPAND)
# Gauge
self.costs_gauge = gauge = wx.Gauge(win, wx.ID_ANY)
sizer.Add(gauge, 0, wx.EXPAND)
# Collect Button
win.collect_button = button = wx.Button(win, wx.ID_ANY, "Collect")
sizer.Add(button, 0, wx.EXPAND)
win.SetSizer(sizer)
win.SetupScrolling()
self.AddPage(win, "Actual Costs")
def OnSQLChange(self, event):
""" Disable Connect button when required edit field is empty """
if event.GetString() == "":
self.connect_button.Disable()
else:
self.connect_button.Enable()
def OnVerboseCheck(self, event):
wx.Log.SetVerbose(self.check_verbose.GetValue())
def OnTestButton(self, event):
pass
class SQLBox(wx.StaticBox):
def __init__(self, parent, id, label):
wx.StaticBox.__init__(self, parent, id, label)
self.row = 0
# "internal name": ("name on server", "output title")
self.sql_fields = {"sales order": ("od20sord.fsono", "Sales Order"),
"so item": ("od20sord.fitemno1", "Sales Order Item"),
"work order": ("pd20woi1.forderno", "Work Order"),
"op seq": ("bd01stb1.fopseq", "OP Sequence"),
"op code": ("bd01stb1.fopcode", "OP Code"),
"comp code": ("bd01stb2.fcompcode", "Component Code"),
"comp qty": ("bd01stb2.fqty", "Component Quantity"),
"customer": ("empty", "Customer ID"),
"product": ("od20sord.fpdcode", "Product Code"),
"mould": ("empty", "Mould Code"),
"order date": ("od20sord.fsodate", "Order Date"),
"commit date": ("od20sord.fddate", "Commit Date"),
"order qty": ("od20sord.fqty", "Order Quantity"),
"plan qty": ("pd20woi1.fwoqty", "Plan Quantity"),
"salesman": ("empty", "Salesman ID")}
# building dict like: {"Product Code":"od20sord.fpdcode"}
self.titles = {}
for field in self.sql_fields.iterkeys():
key = self.sql_fields[field][1]
value = field
self.titles[key] = value
# building dict like: {"od20sord.fpdcode":"product"}
self.names = {}
for field in self.sql_fields.iterkeys():
key = self.sql_fields[field][0]
value = field
self.names[key] = value
# building table translator dictionary likke: {"od20sord": "t1"}
self.translator = {}
index = 1
for name in self.names.iterkeys():
if self.translator.has_key(name):
continue
table = name.rstrip(string.letters).replace(".", "")
self.translator[table] = "t%d" % index
index += 1
def GetTitles(self):
""" Gives a simple list of field descriptions """
field_names = []
for title in self.titles.iterkeys():
field_names.append(title)
return field_names
def Translate(self, table):
""" Give table's short identifier """
assert self.translator.has_key(table), "No dictionary entry for %s!" % table
return self.translator[table]
def MakeShort(self, field):
""" Replaces table in table.field with short identifier """
alphanum = string.letters + string.digits
table = field.rstrip(alphanum).replace(".", "")
table = self.Translate(table)
field = field.lstrip(alphanum)
complete = table + field
assert complete, "%s does not have the expected format "table.field""
return complete
class FilterBox(SQLBox):
def __init__(self, parent, id, label, style=FB_SO|FB_CU|FB_PR|FB_MO|FB_OD|FB_CD|FB_SM):
SQLBox.__init__(self, parent, id, label)
self.ranges = {} # {name+"_min": textctrl, name+"_max": textctrl}
self.checks = {} # {name: checkbox}
self.selects = {} # {name+"_sel": listbox}
self.ids = {} # {button_id: listbox/editctrl}
self.id = 100
self.sizer = wx.StaticBoxSizer(self, wx.VERTICAL)
self.subsizer = wx.GridBagSizer(vgap=10, hgap=10)
self.style = style
def AddRangeLine(linetype, name):
self.ids[self.id] = {}
self.ids[self.id+1] = {}
parent = self.GetParent()
title = self.sql_fields[name][1]
if linetype == "text":
self.ids[self.id]["textctrl"] = self.ranges[name+"_min"] = wx.TextCtrl(parent, wx.ID_ANY)
self.ids[self.id]["textctrl"] = self.ranges[name+"_max"] = wx.TextCtrl(parent, wx.ID_ANY)
elif linetype == "date":
self.ids[self.id]["datepickerctrl"] = self.ranges[name+"_min"] = wx.DatePickerCtrl(parent, wx.ID_ANY, style = wx.DP_DEFAULT | wx.DP_SHOWCENTURY)
self.ids[self.id+1]["datepickerctrl"] = self.ranges[name+"_max"] = wx.DatePickerCtrl(parent, wx.ID_ANY, style = wx.DP_DEFAULT | wx.DP_SHOWCENTURY)
else:
raise "Unknown type of range line: " + linetype
self.checks[name] = check = wx.CheckBox(parent, wx.ID_ANY, title)
self.subsizer.Add(check, (self.row,0))
self.subsizer.Add(wx.StaticText(parent, wx.ID_ANY, "min:"), (self.row,1))
self.subsizer.Add(self.ranges[name+"_min"], (self.row,2))
self.subsizer.Add(wx.StaticText(parent, wx.ID_ANY, "max:"), (self.row,3))
self.subsizer.Add(self.ranges[name+"_max"], (self.row,4))
self.id += 2
self.row += 1
def AddSelectLine(name):
self.ids[self.id] = {}
parent = self.GetParent()
title = "Certain %ss" % self.sql_fields[name][1]
self.checks[name+"_sel"] = check = wx.CheckBox(parent, wx.ID_ANY, title)
self.subsizer.Add(check, (self.row,0))
self.ids[self.id]["textctrl"] = self.selects[name+"_sel"] = wx.TextCtrl(parent, wx.ID_ANY)
self.subsizer.Add(self.selects[name+"_sel"], (self.row,2))
button = wx.Button(parent, self.id, "Add")
eventManager.Register(self.OnListAdd, wx.EVT_BUTTON, button)
self.subsizer.Add(button, (self.row,3))
self.ids[self.id]["listbox"] = self.selects[name] = wx.ListBox(parent, wx.ID_ANY, style=wx.LB_SINGLE | wx.LB_NEEDED_SB | wx.LB_SORT)
self.subsizer.Add(self.selects[name], (self.row,4))
button = wx.Button(parent, self.id, "Remove")
eventManager.Register(self.OnListDel, wx.EVT_BUTTON, button)
self.subsizer.Add(button, (self.row,5))
self.id += 1
self.row += 1
if style & FB_SO:
AddRangeLine("text", "sales order")
if style & FB_CU:
AddRangeLine("text", "customer")
AddSelectLine("customer")
if style & FB_PR:
AddRangeLine("text", "product")
AddSelectLine("product")
if style & FB_MO:
AddRangeLine("text", "mould")
AddSelectLine("mould")
if style & FB_OD:
AddRangeLine("date", "order date")
if style & FB_CD:
AddRangeLine("date", "commit date")
if style & FB_SM:
AddRangeLine("text", "salesman")
self.sizer.Add(self.subsizer, 0, wx.EXPAND)
def GetString(self):
"""gives selected filters as SQL conditions"""
sql_filter = ""
def from_select(name):
"""Give SQL condition from listbox items"""
if not self.checks[name+"_sel"].GetValue():
return ""
sel = self.selects[name+"_sel"]
count = sel.GetCount()
items = []
sql_filter = ""
field = self.sql_fields[name][0]
field = self.MakeShort(field)
for pos in range(count):
string = sel.GetString(pos)
items.append(string)
for item in items:
sql_filter += "OR %s LIKE %s " % (field, item)
return sql_filter
def from_range(fieldtype, name):
"Give SQL range condition from TextCtrl (edit) fields"
if not self.checks[name].GetValue():
return ""
tmp = ""
field = self.sql_fields[name][0]
field = self.MakeShort(field)
min_val = self.ranges[name+"_min"].GetValue()
if min_val:
if fieldtype == "date":
tmp += "AND DATEPART(dd, %s) >= %s " % (field, min_val.GetDay())
tmp += "AND DATEPART(mm, %s) >= %s " % (field, min_val.GetMonth())
tmp += "AND DATEPART(yyyy, %s) >= %s " % (field, min_val.GetYear())
else:
tmp += "AND %s >= %s " % (field, min_val)
max_val = self.ranges[name+"_max"].GetValue()
if max_val:
if fieldtype == "date":
tmp += "AND DATEPART(dd, %s) <= %s " % (field, max_val.GetDay())
tmp += "AND DATEPART(mm, %s) <= %s " % (field, max_val.GetMonth())
tmp += "AND DATEPART(yyyy, %s) <= %s " % (field, max_val.GetYear())
else:
tmp += "AND %s <= %s " % (field, max_val)
return tmp
if self.style & FB_SO:
sql_filter += from_range("text", "sales order")
if self.style & FB_CU:
sql_filter += from_range("text", "customer")
sql_filter += from_select("customer")
if self.style & FB_PR:
sql_filter += from_range("text", "product")
sql_filter += from_select("product")
if self.style & FB_MO:
sql_filter += from_range("text", "mould")
sql_filter += from_select("mould")
if self.style & FB_OD:
sql_filter += from_range("date", "order date")
if self.style & FB_CD:
sql_filter += from_range("date", "commit date")
if self.style & FB_SM:
sql_filter += from_range("text", "salesman")
return sql_filter
def GetCheckedFields(self):
""" Gives a list of all checked SQL field names. """
fields = []
for name, check in self.checks.items():
field = self.sql_fields[name.replace("_sel", "")][0]
if check.GetValue() and (field != "empty"):
fields.append(field)
return fields
def OnListAdd(self, e):
"""Add a value from an edit field to a listbox"""
listbox = self.ids[e.GetId()]["listbox"]
edit = self.ids[e.GetId()]["textctrl"]
value = edit.GetValue()
if value:
listbox.Append(value)
def OnListDel(self, e):
"""Delete the selected value from a listbox"""
listbox = self.ids[e.GetId()]["listbox"]
edit = self.ids[e.GetId()]["textctrl"]
selection = listbox.GetSelections()
if selection:
listbox.Delete(selection[0])
def HasChecked(self):
""" True if any item is checked. """
result = False
for checkbox in self.checks.itervalues():
if checkbox.GetValue():
result = True
return result
class GroupBox(SQLBox):
def __init__(self, parent, id, label):
SQLBox.__init__(self, parent, id, label)
# create and add the CheckListBox
self.sizer = wx.StaticBoxSizer(self, wx.VERTICAL)
self.checklist = wx.CheckListBox(parent, wx.ID_ANY, choices=self.GetTitles(), style=wx.LB_SORT)
self.sizer.Add(self.checklist, 1, wx.EXPAND)
def GetSelection(self):
""" Gives a dictionary of selected fields like: {"title": "field name"} """
count = self.checklist.GetCount()
checked = {}
for index in range(count):
if self.checklist.IsChecked(index):
key = self.checklist.GetString(index)
value = self.titles[key]
checked[key] = value
return checked
def GetString(self, get_all=False):
"""gives selected columns as SQL GROUP BY for output"""
sql_group = ""
def is_empty(name):
if (name == "empty"):
wx.LogError("Missing SQL field translation!")
return True
return False
if get_all:
for value in self.sql_fields.itervalues():
if is_empty(value[0]):
continue
sql_group += value[0] + ", "
else:
for name in self.GetSelection().itervalues():
if is_empty(name):
continue
sql_group += name + ", "
if sql_group:
sql_group = "GROUP BY " + sql_group.rstrip(", ") + " ORDER BY " + self.sql_fields["sales order"][0]
return sql_group
def HasChecked(self):
""" True if any item is checked. """
return (len(self.GetSelection()) > 0)
if __name__ == '__main__':
App = MainApp(redirect = 0) # redirect stdio to console
App.SetAppName("SoftPRO Data Collector")
App.MainLoop()
Revision: 5276
Initial Code
Initial URL
Initial Description
Initial Title
Initial Tags
Initial Language
at February 24, 2008 23:00 by stagger
Initial Code
#!/usr/bin/env python
# encoding: utf-8
"""
collector.py
Created by Daniel Link on 2007-12-05.
Copyright (c) 2007 Top Trend Manucfacturing Co., Ltd.. All rights reserved.
"""
import wx
import wx.lib.scrolledpanel as scrolled
import wx.lib.newevent
from wx.lib.evtmgr import eventManager
import sys
import os
import pymssql
import softpro
import validators
import string
import csv
import time
import thread
FB_SO = 1
FB_CU = 2
FB_PR = 4
FB_MO = 8
FB_OD = 16
FB_CD = 32
FB_SM = 64
# This creates a new Event class and a EVT binder function
(ThreadDone, EVT_THREAD_DONE) = wx.lib.newevent.NewEvent()
class MainApp(wx.App):
""" Collects various information from SQL server """
def OnInit(self):
""" Create main frame and initialize variables """
self.connected = False # SQL server connection status
self.verbose = False
self.main_frame = MainFrame(None, -1, "SoftPRO data collector")
self.nb = self.main_frame.notebook
self.thread_id = 1
self.threads = {}
# SQL events:
eventManager.Register(self.OnConnect, wx.EVT_BUTTON, self.nb.sql_win.connect_button)
eventManager.Register(self.OnConnect, wx.EVT_TEXT_ENTER, self.nb.sql_win.host_edit)
eventManager.Register(self.OnConnect, wx.EVT_TEXT_ENTER, self.nb.sql_win.user_edit)
eventManager.Register(self.OnConnect, wx.EVT_TEXT_ENTER, self.nb.sql_win.password_edit)
eventManager.Register(self.OnConnect, wx.EVT_TEXT_ENTER, self.nb.sql_win.database_edit)
# Job events:
eventManager.Register(self.OnOPCollect, wx.EVT_BUTTON, self.nb.pages["op_win"].collect_button)
#eventManager.Register(self.OnCostCollect, wx.EVT_BUTTON, self.nb.pages["costs_win"].collect_button)
# Thread events:
eventManager.Register(self.OnClose, wx.EVT_CLOSE, self.main_frame)
#eventManager.Register(self.OnGaugeUpdate, EVT_GAUGE_UPDATE, self.main_frame)
eventManager.Register(self.OnThreadDone, EVT_THREAD_DONE, self.main_frame)
self.main_frame.Show(True)
return True
def OnConnect(self, e):
""" Connect / Disconnect from SQL Server """
nb = self.main_frame.notebook
sql = self.main_frame.notebook.sql_win
host = sql.host_edit.GetValue()
user = sql.user_edit.GetValue()
password = sql.password_edit.GetValue()
database = sql.database_edit.GetValue()
if not self.connected: # connect
try:
self.sql_connection = pymssql.connect(host=host, user=user, password=password, database=database)
except:
wx.LogError("Connection failed.nnException: " + str(sys.exc_info()[0]))
raise
self.connected = True
for widget in sql.login_widgets:
widget.Disable()
wx.LogStatus("Connected to %s as %s" % (host, user))
sql.connect_button.SetLabel("Disconnect")
for page in nb.pages.itervalues():
page.Enable()
else: # disconnect
self.sql_connection.close()
self.connected = False
for widget in sql.login_widgets:
widget.Enable()
wx.LogStatus("Disconnected %s from %s" % (user, host))
sql.connect_button.SetLabel("Connect")
for page in nb.pages.itervalues():
page.Disable()
def OnOPCollect(self, e):
""" Collect OP strings which pass through filter and write them to disk """
dlg = wx.FileDialog(self.main_frame, message="Save file as ...",
defaultDir=os.getcwd(),
defaultFile="",
wildcard="CSV Table for Excel Import|*.csv",
style=wx.SAVE)
if dlg.ShowModal() != wx.ID_OK:
return
op_filter_box = self.nb.op_filter_box
cur = self.sql_connection.cursor()
# Disable some widgets
self.nb.pages["op_win"].collect_button.Disable()
op_filter_box.Disable()
wx.LogStatus("Searching for product variants...")
# get all sales orders that pass through filter
name = "od20sord"
t = op_filter_box.Translate(name)
query = "SELECT %s.fsono" % t
#for field in op_filter_box.GetCheckedFields():
# if (field != t + ".fsono"):
# query += ", " + op_filter_box.MakeShort(field)
query += " FROM %s %s WHERE %s.fitemno1 LIKE ' 1' " % (name, t, t)
if not op_filter_box.HasChecked():
wx.LogWarning("You have chosen not to filter output. The operation will probably take a very long time.")
else:
query += op_filter_box.GetString()
query += " GROUP BY %s.fsono" % t
wx.LogVerbose("Executing Query: %s" % query)
try:
cur.execute(query)
except:
message = "SQL Query failed. "
if wx.Log.GetVerbose():
message += "Check the log for details. Does the query correctly execute when you run it manually with Microsoft SQL Enterprise Manager?"
else:
message += "Please enable verbose output on the logging page and repeat what you just did."
wx.LogError(message + "nnException: " + str(sys.exc_info()[0]))
wx.LogStatus("")
raise
results = cur.fetchall()
if not results:
wx.LogWarning("Your query was too restrictive. The server did not return any results. Sorry.")
return
codes = []
for row in results:
codes.append(row[0])
wx.LogStatus("Finding OP codes for %d Sales Orders..." % len(codes))
wx.LogVerbose("Get some coffee! Seriously!")
# Start thread here
assert not self.threads.has_key("op"), "The OP thread seems to be running already."
self.threads["op"] = OPCollectThread(self.main_frame, self.thread_id, cur, self.nb.op_gauge, self.nb.op_remain, codes, dlg.GetPath()+".csv")
self.threads["op"].Start()
button = self.nb.pages["op_win"].collect_button
eventManager.DeregisterWindow(button)
eventManager.Register(self.OnOPAbort, wx.EVT_BUTTON, button)
button.SetLabel("Abort")
button.Enable()
self.thread_id += 1
def OnCostCollect(self, e):
"""Gives all the costs which pass through the filter"""
pass
def OnClose(self, e):
""" Stop running threads on window close """
if len(self.threads) > 0:
busy = wx.BusyInfo("Shutting down pending processes...")
for t in self.threads.itervalues():
t.Stop()
running = 1
while running:
running = 0
for t in self.threads.itervalues():
running = running + t.IsRunning()
time.sleep(0.1)
# Pass event to the next handler (and, thus, close the window)
e.Skip()
def OnThreadDone(self, e):
t = e.type
assert self.threads.has_key(t), "There is no %s thread!" % t
del self.threads[t]
button = self.nb.pages[t+"_win"].collect_button
eventManager.DeregisterWindow(button)
eventManager.Register(self.OnOPCollect, wx.EVT_BUTTON, button)
button.SetLabel("Collect")
def OnOPAbort(self, e):
assert self.threads.has_key("op"), "The process can not be aborted, because it is not running!"
self.threads["op"].Stop()
wx.LogStatus("OP Collection aborted.")
class BasicThread:
def __init__(self, parent, id, cur, gauge, remain, filename):
""" Initialize Thread
parent: where EVT_GAUGE_UPDATE shall be sent
cur: pymssql SQL connection cursor
gauge: wx.Gauge
remain: wx.StaticText for remaining time
filename: where to save the results
Derived classes _must_ set self.total in their __init__!
"""
self.parent = parent
self.cur = cur
self.gauge = gauge
self.remain = remain
self.filename = filename
self.running = False
self.keepgoing = False
self.progress = 0
def Start(self):
self.running = True
self.keepgoing = True
self.start_time = time.time()
wx.LogVerbose("Starting Thread...")
thread.start_new_thread(self.Run, ())
def Stop(self):
self.keepgoing = False
def IsRunning(self):
return self.running
def Run(self):
self.running = False
class OPCollectThread(BasicThread):
def __init__(self, parent, id, cur, gauge, remain, sorders, filename):
""" Initialize Thread
sorders: list of sales order codes
"""
BasicThread.__init__(self, parent, id, cur, gauge, remain, filename)
self.sorders = sorders
self.total = len(sorders)
wx.CallAfter(gauge.SetRange, self.total)
self.opdic = {}
def Run(self):
# collect OP codes for current sales order:
while self.keepgoing and (self.progress < self.total):
code = self.sorders[self.progress] # code of current sales order
sorder = softpro.SalesOrder(self.cur, code)
for soitem in sorder.items:
bom = soitem.bom
if bom:
opstring = soitem.get_opstring()
qty = int(soitem.bom.quantity)
if opstring:
if not self.opdic.has_key(opstring):
self.opdic[opstring] = qty
else:
self.opdic[opstring] += qty
wx.LogVerbose("%s:%s (%6d) => %s" % (code, soitem.number.lstrip(), soitem.bom.quantity, opstring))
self.progress += 1
# Update Gauge
wx.CallAfter(self.gauge.SetValue, self.progress)
# Update StaticText with remaining time
elapsed = time.time() - self.start_time
per_so = elapsed/self.progress
remain = per_so*(self.total-self.progress)
text = time.strftime("%H:%M:%S", time.gmtime(remain))
wx.CallAfter(self.remain.SetLabel, text)
# if all sales orders have been processed, write them to disk:
if (self.progress == self.total):
wx.LogVerbose("%d product variants could be identified." % len(self.opdic))
writer = csv.writer(file(self.filename, "w"), dialect="excel", delimiter=";")
position = 0
wx.LogVerbose("Writing %s rows to " % self.filename)
for op, quantity in self.opdic.items():
position += 1
writer.writerow([op, quantity])
if (position % 100 == 0):
wx.LogVerbose("%d rows written" % position)
wx.CallAfter(wx.LogStatus, "Wrote %d product variants to %s" % (position, self.filename))
# reset Gauge
wx.CallAfter(self.gauge.SetValue, 0)
# signal thread stop
self.running = False
evt = ThreadDone(type="op")
wx.PostEvent(self.parent, evt)
class MainFrame(wx.Frame):
def __init__(self,parent,id,title):
wx.Frame.__init__(self, parent, wx.ID_ANY, title, size=(800,600), style=wx.DEFAULT_FRAME_STYLE)
self.CreateStatusBar()
sizer = wx.BoxSizer(wx.VERTICAL)
self.notebook = notebook = MainNotebook(self, wx.ID_ANY)
sizer.Add(notebook, 1, wx.EXPAND)
self.SetSizer(sizer)
class MainNotebook(wx.Notebook):
def __init__(self, parent, id):
""" Add pages and disable all except SQL Connection """
wx.Notebook.__init__(self, parent, id)
self.pages = {} # for easy disabling of collector pages
self.AddGeneralPage()
self.AddOPPage()
#self.AddCostsPage()
for page in self.pages.itervalues():
page.Disable()
def AddGeneralPage(self):
""" Add Page for SQL Connection """
row = 1
self.sql_win = win = wx.Panel(self, wx.ID_ANY)
sizer = wx.BoxSizer(wx.VERTICAL)
win.login_widgets = []
login_box = wx.StaticBox(win, wx.ID_ANY, "SQL Connection")
login_box_sizer = wx.StaticBoxSizer(login_box, wx.VERTICAL)
login_sizer = wx.FlexGridSizer(cols=2, vgap=10, hgap=10)
login_sizer.AddGrowableCol(1)
# Host
text = wx.StaticText(win, wx.ID_ANY, "Host:")
win.login_widgets.append(text)
login_sizer.Add(text, 0)
sql_host = "server ip"
if (sys.platform == "darwin"): # Mac needs port number too
sql_host += ":1433"
win.host_edit = edit = wx.TextCtrl(win, wx.ID_ANY, sql_host, validator=validators.HostValidator(), style=wx.TE_PROCESS_ENTER)
win.login_widgets.append(edit)
login_sizer.Add(edit, 1, wx.EXPAND)
row += 1
# User
text = wx.StaticText(win, wx.ID_ANY, "User:")
win.login_widgets.append(text)
login_sizer.Add(text, 0)
win.user_edit = edit = wx.TextCtrl(win, wx.ID_ANY, "user name", style=wx.TE_PROCESS_ENTER)
win.login_widgets.append(edit)
login_sizer.Add(edit, 1, wx.EXPAND)
row += 1
# Password
text = wx.StaticText(win, wx.ID_ANY, "Password:")
win.login_widgets.append(text)
login_sizer.Add(text, 0)
win.password_edit = edit = wx.TextCtrl(win, wx.ID_ANY, style=wx.TE_PASSWORD | wx.TE_PROCESS_ENTER)
edit.SetFocus()
win.login_widgets.append(edit)
login_sizer.Add(edit, 1, wx.EXPAND)
row += 1
# Database
text = wx.StaticText(win, wx.ID_ANY, "Database:")
win.login_widgets.append(text)
login_sizer.Add(text, 0)
win.database_edit = edit = wx.TextCtrl(win, wx.ID_ANY, "TTM", style=wx.TE_PROCESS_ENTER)
win.login_widgets.append(edit)
login_sizer.Add(edit, 1, wx.EXPAND)
row += 1
# Events
eventManager.Register(self.OnSQLChange, wx.EVT_TEXT, win)
# Put in supersizer
login_box_sizer.Add(login_sizer, 0, wx.EXPAND)
login_box_sizer.AddSpacer((1,10))
# Connect Button
win.connect_button = button = wx.Button(win, wx.ID_ANY, "Connect")
login_box_sizer.Add(button, 0, wx.EXPAND)
# Add box sizer to window sizer
sizer.Add(login_box_sizer, 0, wx.EXPAND)
log_box = wx.StaticBox(win, wx.ID_ANY, "Logging")
log_box_sizer = wx.StaticBoxSizer(log_box, wx.VERTICAL)
# Toggle Verbose Mode
self.check_verbose = check = wx.CheckBox(win, wx.ID_ANY, "Verbose Mode")
eventManager.Register(self.OnVerboseCheck, wx.EVT_CHECKBOX, check)
log_box_sizer.Add(check, 0)
sizer.Add(log_box_sizer, 0, wx.EXPAND)
win.SetSizer(sizer)
self.AddPage(win, "General")
def AddOPPage(self):
""" Add Page for OP Sequence """
self.pages["op_win"] = win = scrolled.ScrolledPanel(self, wx.ID_ANY)
sizer = wx.BoxSizer(wx.VERTICAL)
"""
# Output options
self.op_group_box = box = GroupBox(win, wx.ID_ANY, "Output Fields")
sizer.Add(box.sizer, 0, wx.EXPAND)
"""
# Filter
self.op_filter_box = box = FilterBox(win, wx.ID_ANY, "Filter by", style=FB_SO|FB_OD|FB_CD)
sizer.Add(box.sizer, 0, wx.EXPAND)
# Gauge
subsizer = wx.BoxSizer(wx.HORIZONTAL)
self.op_gauge = gauge = wx.Gauge(win, wx.ID_ANY)
subsizer.Add(gauge, 1)
subsizer.AddSpacer((5,1))
self.op_remain = text = wx.StaticText(win, wx.ID_ANY, "00:00:00")
subsizer.Add(text, 0)
subsizer.AddSpacer((5,1))
sizer.Add(subsizer, 0, wx.EXPAND)
# Collect Button
win.collect_button = button = wx.Button(win, wx.ID_ANY, "Collect")
sizer.Add(button, 0, wx.EXPAND)
"""
# Test Button
button = wx.Button(win, wx.ID_ANY, "Print SQL Filter and Group string")
eventManager.Register(self.OnTestButton, wx.EVT_BUTTON, button)
sizer.Add(button, 0, wx.EXPAND)
"""
win.SetSizer(sizer)
win.SetupScrolling()
self.AddPage(win, "OP Sequence")
def AddCostsPage(self):
""" Add Page for Actual Costs """
self.pages["costs_win"] = win = scrolled.ScrolledPanel(self, wx.ID_ANY)
sizer = wx.BoxSizer(wx.VERTICAL)
# Output options
self.costs_group_box = box = GroupBox(win, wx.ID_ANY, "Output Fields")
sizer.Add(box.sizer, 0, wx.EXPAND)
# Filter
self.costs_filter_box = box = FilterBox(win, wx.ID_ANY, "Filter by")
sizer.Add(box.sizer, 0, wx.EXPAND)
# Gauge
self.costs_gauge = gauge = wx.Gauge(win, wx.ID_ANY)
sizer.Add(gauge, 0, wx.EXPAND)
# Collect Button
win.collect_button = button = wx.Button(win, wx.ID_ANY, "Collect")
sizer.Add(button, 0, wx.EXPAND)
win.SetSizer(sizer)
win.SetupScrolling()
self.AddPage(win, "Actual Costs")
def OnSQLChange(self, event):
""" Disable Connect button when required edit field is empty """
if event.GetString() == "":
self.connect_button.Disable()
else:
self.connect_button.Enable()
def OnVerboseCheck(self, event):
wx.Log.SetVerbose(self.check_verbose.GetValue())
def OnTestButton(self, event):
pass
class SQLBox(wx.StaticBox):
def __init__(self, parent, id, label):
wx.StaticBox.__init__(self, parent, id, label)
self.row = 0
# "internal name": ("name on server", "output title")
self.sql_fields = {"sales order": ("od20sord.fsono", "Sales Order"),
"so item": ("od20sord.fitemno1", "Sales Order Item"),
"work order": ("pd20woi1.forderno", "Work Order"),
"op seq": ("bd01stb1.fopseq", "OP Sequence"),
"op code": ("bd01stb1.fopcode", "OP Code"),
"comp code": ("bd01stb2.fcompcode", "Component Code"),
"comp qty": ("bd01stb2.fqty", "Component Quantity"),
"customer": ("empty", "Customer ID"),
"product": ("od20sord.fpdcode", "Product Code"),
"mould": ("empty", "Mould Code"),
"order date": ("od20sord.fsodate", "Order Date"),
"commit date": ("od20sord.fddate", "Commit Date"),
"order qty": ("od20sord.fqty", "Order Quantity"),
"plan qty": ("pd20woi1.fwoqty", "Plan Quantity"),
"salesman": ("empty", "Salesman ID")}
# building dict like: {"Product Code":"od20sord.fpdcode"}
self.titles = {}
for field in self.sql_fields.iterkeys():
key = self.sql_fields[field][1]
value = field
self.titles[key] = value
# building dict like: {"od20sord.fpdcode":"product"}
self.names = {}
for field in self.sql_fields.iterkeys():
key = self.sql_fields[field][0]
value = field
self.names[key] = value
# building table translator dictionary likke: {"od20sord": "t1"}
self.translator = {}
index = 1
for name in self.names.iterkeys():
if self.translator.has_key(name):
continue
table = name.rstrip(string.letters).replace(".", "")
self.translator[table] = "t%d" % index
index += 1
def GetTitles(self):
""" Gives a simple list of field descriptions """
field_names = []
for title in self.titles.iterkeys():
field_names.append(title)
return field_names
def Translate(self, table):
""" Give table's short identifier """
assert self.translator.has_key(table), "No dictionary entry for %s!" % table
return self.translator[table]
def MakeShort(self, field):
""" Replaces table in table.field with short identifier """
alphanum = string.letters + string.digits
table = field.rstrip(alphanum).replace(".", "")
table = self.Translate(table)
field = field.lstrip(alphanum)
complete = table + field
assert complete, "%s does not have the expected format "table.field""
return complete
class FilterBox(SQLBox):
def __init__(self, parent, id, label, style=FB_SO|FB_CU|FB_PR|FB_MO|FB_OD|FB_CD|FB_SM):
SQLBox.__init__(self, parent, id, label)
self.ranges = {} # {name+"_min": textctrl, name+"_max": textctrl}
self.checks = {} # {name: checkbox}
self.selects = {} # {name+"_sel": listbox}
self.ids = {} # {button_id: listbox/editctrl}
self.id = 100
self.sizer = wx.StaticBoxSizer(self, wx.VERTICAL)
self.subsizer = wx.GridBagSizer(vgap=10, hgap=10)
self.style = style
def AddRangeLine(linetype, name):
self.ids[self.id] = {}
self.ids[self.id+1] = {}
parent = self.GetParent()
title = self.sql_fields[name][1]
if linetype == "text":
self.ids[self.id]["textctrl"] = self.ranges[name+"_min"] = wx.TextCtrl(parent, wx.ID_ANY)
self.ids[self.id]["textctrl"] = self.ranges[name+"_max"] = wx.TextCtrl(parent, wx.ID_ANY)
elif linetype == "date":
self.ids[self.id]["datepickerctrl"] = self.ranges[name+"_min"] = wx.DatePickerCtrl(parent, wx.ID_ANY, style = wx.DP_DEFAULT | wx.DP_SHOWCENTURY)
self.ids[self.id+1]["datepickerctrl"] = self.ranges[name+"_max"] = wx.DatePickerCtrl(parent, wx.ID_ANY, style = wx.DP_DEFAULT | wx.DP_SHOWCENTURY)
else:
raise "Unknown type of range line: " + linetype
self.checks[name] = check = wx.CheckBox(parent, wx.ID_ANY, title)
self.subsizer.Add(check, (self.row,0))
self.subsizer.Add(wx.StaticText(parent, wx.ID_ANY, "min:"), (self.row,1))
self.subsizer.Add(self.ranges[name+"_min"], (self.row,2))
self.subsizer.Add(wx.StaticText(parent, wx.ID_ANY, "max:"), (self.row,3))
self.subsizer.Add(self.ranges[name+"_max"], (self.row,4))
self.id += 2
self.row += 1
def AddSelectLine(name):
self.ids[self.id] = {}
parent = self.GetParent()
title = "Certain %ss" % self.sql_fields[name][1]
self.checks[name+"_sel"] = check = wx.CheckBox(parent, wx.ID_ANY, title)
self.subsizer.Add(check, (self.row,0))
self.ids[self.id]["textctrl"] = self.selects[name+"_sel"] = wx.TextCtrl(parent, wx.ID_ANY)
self.subsizer.Add(self.selects[name+"_sel"], (self.row,2))
button = wx.Button(parent, self.id, "Add")
eventManager.Register(self.OnListAdd, wx.EVT_BUTTON, button)
self.subsizer.Add(button, (self.row,3))
self.ids[self.id]["listbox"] = self.selects[name] = wx.ListBox(parent, wx.ID_ANY, style=wx.LB_SINGLE | wx.LB_NEEDED_SB | wx.LB_SORT)
self.subsizer.Add(self.selects[name], (self.row,4))
button = wx.Button(parent, self.id, "Remove")
eventManager.Register(self.OnListDel, wx.EVT_BUTTON, button)
self.subsizer.Add(button, (self.row,5))
self.id += 1
self.row += 1
if style & FB_SO:
AddRangeLine("text", "sales order")
if style & FB_CU:
AddRangeLine("text", "customer")
AddSelectLine("customer")
if style & FB_PR:
AddRangeLine("text", "product")
AddSelectLine("product")
if style & FB_MO:
AddRangeLine("text", "mould")
AddSelectLine("mould")
if style & FB_OD:
AddRangeLine("date", "order date")
if style & FB_CD:
AddRangeLine("date", "commit date")
if style & FB_SM:
AddRangeLine("text", "salesman")
self.sizer.Add(self.subsizer, 0, wx.EXPAND)
def GetString(self):
"""gives selected filters as SQL conditions"""
sql_filter = ""
def from_select(name):
"""Give SQL condition from listbox items"""
if not self.checks[name+"_sel"].GetValue():
return ""
sel = self.selects[name+"_sel"]
count = sel.GetCount()
items = []
sql_filter = ""
field = self.sql_fields[name][0]
field = self.MakeShort(field)
for pos in range(count):
string = sel.GetString(pos)
items.append(string)
for item in items:
sql_filter += "OR %s LIKE %s " % (field, item)
return sql_filter
def from_range(fieldtype, name):
"Give SQL range condition from TextCtrl (edit) fields"
if not self.checks[name].GetValue():
return ""
tmp = ""
field = self.sql_fields[name][0]
field = self.MakeShort(field)
min_val = self.ranges[name+"_min"].GetValue()
if min_val:
if fieldtype == "date":
tmp += "AND DATEPART(dd, %s) >= %s " % (field, min_val.GetDay())
tmp += "AND DATEPART(mm, %s) >= %s " % (field, min_val.GetMonth())
tmp += "AND DATEPART(yyyy, %s) >= %s " % (field, min_val.GetYear())
else:
tmp += "AND %s >= %s " % (field, min_val)
max_val = self.ranges[name+"_max"].GetValue()
if max_val:
if fieldtype == "date":
tmp += "AND DATEPART(dd, %s) <= %s " % (field, max_val.GetDay())
tmp += "AND DATEPART(mm, %s) <= %s " % (field, max_val.GetMonth())
tmp += "AND DATEPART(yyyy, %s) <= %s " % (field, max_val.GetYear())
else:
tmp += "AND %s <= %s " % (field, max_val)
return tmp
if self.style & FB_SO:
sql_filter += from_range("text", "sales order")
if self.style & FB_CU:
sql_filter += from_range("text", "customer")
sql_filter += from_select("customer")
if self.style & FB_PR:
sql_filter += from_range("text", "product")
sql_filter += from_select("product")
if self.style & FB_MO:
sql_filter += from_range("text", "mould")
sql_filter += from_select("mould")
if self.style & FB_OD:
sql_filter += from_range("date", "order date")
if self.style & FB_CD:
sql_filter += from_range("date", "commit date")
if self.style & FB_SM:
sql_filter += from_range("text", "salesman")
return sql_filter
def GetCheckedFields(self):
""" Gives a list of all checked SQL field names. """
fields = []
for name, check in self.checks.items():
field = self.sql_fields[name.replace("_sel", "")][0]
if check.GetValue() and (field != "empty"):
fields.append(field)
return fields
def OnListAdd(self, e):
"""Add a value from an edit field to a listbox"""
listbox = self.ids[e.GetId()]["listbox"]
edit = self.ids[e.GetId()]["textctrl"]
value = edit.GetValue()
if value:
listbox.Append(value)
def OnListDel(self, e):
"""Delete the selected value from a listbox"""
listbox = self.ids[e.GetId()]["listbox"]
edit = self.ids[e.GetId()]["textctrl"]
selection = listbox.GetSelections()
if selection:
listbox.Delete(selection[0])
def HasChecked(self):
""" True if any item is checked. """
result = False
for checkbox in self.checks.itervalues():
if checkbox.GetValue():
result = True
return result
class GroupBox(SQLBox):
def __init__(self, parent, id, label):
SQLBox.__init__(self, parent, id, label)
# create and add the CheckListBox
self.sizer = wx.StaticBoxSizer(self, wx.VERTICAL)
self.checklist = wx.CheckListBox(parent, wx.ID_ANY, choices=self.GetTitles(), style=wx.LB_SORT)
self.sizer.Add(self.checklist, 1, wx.EXPAND)
def GetSelection(self):
""" Gives a dictionary of selected fields like: {"title": "field name"} """
count = self.checklist.GetCount()
checked = {}
for index in range(count):
if self.checklist.IsChecked(index):
key = self.checklist.GetString(index)
value = self.titles[key]
checked[key] = value
return checked
def GetString(self, get_all=False):
"""gives selected columns as SQL GROUP BY for output"""
sql_group = ""
def is_empty(name):
if (name == "empty"):
wx.LogError("Missing SQL field translation!")
return True
return False
if get_all:
for value in self.sql_fields.itervalues():
if is_empty(value[0]):
continue
sql_group += value[0] + ", "
else:
for name in self.GetSelection().itervalues():
if is_empty(name):
continue
sql_group += name + ", "
if sql_group:
sql_group = "GROUP BY " + sql_group.rstrip(", ") + " ORDER BY " + self.sql_fields["sales order"][0]
return sql_group
def HasChecked(self):
""" True if any item is checked. """
return (len(self.GetSelection()) > 0)
if __name__ == '__main__':
App = MainApp(redirect = 0) # redirect stdio to console
App.SetAppName("SoftPRO Data Collector")
App.MainLoop()
Initial URL
Initial Description
This is the main file. You also need the libraries posted here for the program to work.
Initial Title
GUI Collector - Main
Initial Tags
sql, textmate, python
Initial Language
Python