Viewing file: openprinting.py (16.57 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
#!/usr/bin/env python
## system-config-printer
## Copyright (C) 2008 Red Hat, Inc. ## Copyright (C) 2008 Till Kamppeter <till.kamppeter@gmail.com>
## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or ## (at your option) any later version.
## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details.
## You should have received a copy of the GNU General Public License ## along with this program; if not, write to the Free Software ## Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
import urllib, httplib, platform, threading, tempfile, traceback import os, sys from xml.etree.ElementTree import XML from . import Device
__all__ = ['OpenPrinting']
def _normalize_space (text): result = text.strip () result = result.replace ('\n', ' ') i = result.find (' ') while i != -1: result = result.replace (' ', ' ') i = result.find (' ') return result
class _QueryThread (threading.Thread): def __init__ (self, parent, parameters, callback, user_data=None): threading.Thread.__init__ (self) self.parent = parent self.parameters = parameters self.callback = callback self.user_data = user_data
self.setDaemon (True)
def run (self): # CGI script to be executed query_command = "/query.cgi" # Headers for the post request headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain"} params = ("%s&uilanguage=%s&locale=%s" % (urllib.urlencode (self.parameters), self.parent.language[0], self.parent.language[0])) self.url = "http://%s%s?%s" % (self.parent.base_url, query_command, params) # Send request result = None status = 1 try: conn = httplib.HTTPConnection(self.parent.base_url) conn.request("POST", query_command, params, headers) resp = conn.getresponse() status = resp.status if status == 200: result = resp.read() conn.close() except: result = sys.exc_info ()
if status == 200: status = 0
if self.callback != None: self.callback (status, self.user_data, result)
class OpenPrinting: def __init__(self, language=None): """ @param language: language, as given by the first element of locale.setlocale(). @type language: string """ if language == None: import locale try: language = locale.getlocale(locale.LC_MESSAGES) except locale.Error, e: language = 'C' self.language = language
# XXX Read configuration file. self.base_url = "www.openprinting.org"
# Restrictions on driver choices XXX Parameters to be taken from # config file self.onlyfree = 1 self.onlymanufacturer = 0
def cancelOperation(self, handle): """ Cancel an operation.
@param handle: query/operation handle """ # Just prevent the callback. try: handle.callback = None except: pass
def webQuery(self, parameters, callback, user_data=None): """ Run a web query for a driver.
@type parameters: dict @param parameters: URL parameters @type callback: function @param callback: callback function, taking (integer, user_data, string) parameters with the first parameter being the status code, zero for success @return: query handle """ the_thread = _QueryThread (self, parameters, callback, user_data) the_thread.start() return the_thread
def searchPrinters(self, searchterm, callback, user_data=None): """ Search for printers using a search term.
@type searchterm: string @param searchterm: search term @type callback: function @param callback: callback function, taking (integer, user_data, string) parameters with the first parameter being the status code, zero for success @return: query handle """
def parse_result (status, data, result): (callback, user_data) = data if status != 0: callback (status, user_data, result) return
status = 0 printers = {} try: root = XML (result) # We store the printers as a dict of: # foomatic_id: displayname
for printer in root.findall ("printer"): id = printer.find ("id") make = printer.find ("make") model = printer.find ("model") if id != None and make != None and model != None: idtxt = id.text maketxt = make.text modeltxt = model.text if idtxt and maketxt and modeltxt: printers[idtxt] = maketxt + " " + modeltxt except: status = 1 printers = sys.exc_info ()
try: callback (status, user_data, printers) except: (type, value, tb) = sys.exc_info () tblast = traceback.extract_tb (tb, limit=None) if len (tblast): tblast = tblast[:len (tblast) - 1] extxt = traceback.format_exception_only (type, value) for line in traceback.format_tb(tb): print (line.strip ()) print (extxt[0].strip ())
# Common parameters for the request params = { 'type': 'printers', 'printer': searchterm, 'format': 'xml' } return self.webQuery(params, parse_result, (callback, user_data))
def listDrivers(self, model, callback, user_data=None, extra_options=None): """ Obtain a list of printer drivers.
@type model: string or cupshelpers.Device @param model: foomatic printer model string or a cupshelpers.Device object @type callback: function @param callback: callback function, taking (integer, user_data, string) parameters with the first parameter being the status code, zero for success @type extra_options: string -> string dictionary @param extra_options: Additional search options, see http://www.linuxfoundation.org/en/OpenPrinting/Database/Query @return: query handle """
def parse_result (status, data, result): (callback, user_data) = data if status != 0: callback (status, user_data, result)
try: root = XML (result) drivers = {} # We store the drivers as a dict of: # foomatic_id: # { 'name': name, # 'url': url, # 'supplier': supplier, # 'license': short license string e.g. GPLv2, # 'licensetext': license text (Plain text), # 'nonfreesoftware': Boolean, # 'thirdpartysupplied': Boolean, # 'manufacturersupplied': Boolean, # 'patents': Boolean, # 'supportcontacts' (optional): # list of { 'name', # 'url', # 'level', # } # 'shortdescription': short description, # 'recommended': Boolean, # 'functionality': # { 'text': integer percentage, # 'lineart': integer percentage, # 'graphics': integer percentage, # 'photo': integer percentage, # 'speed': integer percentage, # } # 'packages' (optional): # { arch: # { file: # { 'url': url, # 'realversion': upstream version string, # 'version': packaged version string, # 'release': package release string # } # } # } # 'ppds' (optional): # URL string list # } # There is more information in the raw XML, but this # can be added to the Python structure as needed.
for driver in root.findall ('driver'): id = driver.attrib.get ('id') if id == None: continue
dict = {} for attribute in ['name', 'url', 'supplier', 'license', 'shortdescription' ]: element = driver.find (attribute) if element != None and element.text != None: dict[attribute] = _normalize_space (element.text)
element = driver.find ('licensetext') if element != None: dict['licensetext'] = element.text
for boolean in ['nonfreesoftware', 'recommended', 'patents', 'thirdpartysupplied', 'manufacturersupplied']: dict[boolean] = driver.find (boolean) != None
# Make a 'freesoftware' tag for compatibility with # how the OpenPrinting API used to work (see trac # #74). dict['freesoftware'] = not dict['nonfreesoftware']
supportcontacts = [] container = driver.find ('supportcontacts') if container != None: for sc in container.findall ('supportcontact'): supportcontact = {} if sc.text != None: supportcontact['name'] = \ _normalize_space (sc.text) else: supportcontact['name'] = "" supportcontact['url'] = sc.attrib.get ('url') supportcontact['level'] = sc.attrib.get ('level') supportcontacts.append (supportcontact)
if supportcontacts: dict['supportcontacts'] = supportcontacts
if not dict.has_key ('name') or not dict.has_key ('url'): continue
container = driver.find ('functionality') if container != None: functionality = {} for attribute in ['text', 'lineart', 'graphics', 'photo', 'speed']: element = container.find (attribute) if element != None: functionality[attribute] = element.text if functionality: dict[container.tag] = functionality
packages = {} container = driver.find ('packages') if container != None: for arch in container.getchildren (): rpms = {} for package in arch.findall ('package'): rpm = {} for attribute in ['realversion','version', 'release', 'url', 'pkgsys']: element = package.find (attribute) if element != None: rpm[attribute] = element.text
repositories = package.find ('repositories') if repositories != None: for pkgsys in repositories.getchildren (): rpm.setdefault('repositories', {})[pkgsys.tag] = pkgsys.text
rpms[package.attrib['file']] = rpm packages[arch.tag] = rpms
if packages: dict['packages'] = packages
ppds = [] container = driver.find ('ppds') if container != None: for each in container.getchildren (): ppds.append (each.text)
if ppds: dict['ppds'] = ppds
drivers[id] = dict callback (0, user_data, drivers) except: callback (1, user_data, sys.exc_info ())
if isinstance(model, Device): model = model.id
params = { 'type': 'drivers', 'moreinfo': '1', 'showprinterid': '1', 'onlynewestdriverpackages': '1', 'architectures': platform.machine(), 'noobsoletes': '1', 'onlyfree': str (self.onlyfree), 'onlymanufacturer': str (self.onlymanufacturer), 'printer': model, 'format': 'xml'} if extra_options: params.update(extra_options) return self.webQuery(params, parse_result, (callback, user_data))
def _simple_gui (): import gtk, pprint gtk.gdk.threads_init () class QueryApp: def __init__(self): self.openprinting = OpenPrinting() self.main = gtk.Dialog ("OpenPrinting query application", None, gtk.DIALOG_MODAL | gtk.DIALOG_NO_SEPARATOR, (gtk.STOCK_CLOSE, gtk.RESPONSE_CLOSE, "Search", 10, "List", 20)) self.main.set_border_width (6) self.main.vbox.set_spacing (2) vbox = gtk.VBox (False, 6) self.main.vbox.pack_start (vbox, True, True, 0) vbox.set_border_width (6) self.entry = gtk.Entry () vbox.pack_start (self.entry, False, False, 6) sw = gtk.ScrolledWindow () self.tv = gtk.TextView () sw.add (self.tv) vbox.pack_start (sw, True, True, 6) self.main.connect ("response", self.response) self.main.show_all ()
def response (self, dialog, response): if (response == gtk.RESPONSE_CLOSE or response == gtk.RESPONSE_DELETE_EVENT): gtk.main_quit ()
if response == 10: # Run a query. self.openprinting.searchPrinters (self.entry.get_text (), self.search_printers_callback)
if response == 20: self.openprinting.listDrivers (self.entry.get_text (), self.list_drivers_callback)
def search_printers_callback (self, status, user_data, printers): if status != 0: raise printers[1]
text = "" for printer in printers.values (): text += printer + "\n" gtk.gdk.threads_enter () self.tv.get_buffer ().set_text (text) gtk.gdk.threads_leave ()
def list_drivers_callback (self, status, user_data, drivers): if status != 0: raise drivers[1]
text = pprint.pformat (drivers) gtk.gdk.threads_enter () self.tv.get_buffer ().set_text (text) gtk.gdk.threads_leave ()
def query_callback (self, status, user_data, result): gtk.gdk.threads_enter () self.tv.get_buffer ().set_text (str (result)) file ("result.xml", "w").write (str (result)) gtk.gdk.threads_leave ()
q = QueryApp() gtk.main ()
|