diff options
author | Pawel Hajdan, Jr <phajdan.jr@gentoo.org> | 2012-05-30 16:34:40 +0200 |
---|---|---|
committer | Pawel Hajdan, Jr <phajdan.jr@gentoo.org> | 2012-05-30 16:34:40 +0200 |
commit | 520c9782541d2e3fa509b1a2d470889a6d26bef7 (patch) | |
tree | 61ee7afa1b00bdc49d59e65a500ff5229f68b0c7 /third_party/pybugz-0.9.3/bugz/cli.py | |
parent | Process stabilization candidates incrementally, (diff) | |
download | arch-tools-520c9782541d2e3fa509b1a2d470889a6d26bef7.tar.gz arch-tools-520c9782541d2e3fa509b1a2d470889a6d26bef7.tar.bz2 arch-tools-520c9782541d2e3fa509b1a2d470889a6d26bef7.zip |
Make bugzilla-viewer and maintainer-timeout work
by bundling old pybugz.
Diffstat (limited to 'third_party/pybugz-0.9.3/bugz/cli.py')
-rw-r--r-- | third_party/pybugz-0.9.3/bugz/cli.py | 607 |
1 files changed, 607 insertions, 0 deletions
diff --git a/third_party/pybugz-0.9.3/bugz/cli.py b/third_party/pybugz-0.9.3/bugz/cli.py new file mode 100644 index 0000000..35bf98e --- /dev/null +++ b/third_party/pybugz-0.9.3/bugz/cli.py @@ -0,0 +1,607 @@ +#!/usr/bin/env python + +import commands +import locale +import os +import re +import sys +import tempfile +import textwrap + +from urlparse import urljoin + +try: + import readline +except ImportError: + readline = None + +from bugzilla import Bugz +from config import config + +BUGZ_COMMENT_TEMPLATE = \ +""" +BUGZ: --------------------------------------------------- +%s +BUGZ: Any line beginning with 'BUGZ:' will be ignored. +BUGZ: --------------------------------------------------- +""" + +DEFAULT_NUM_COLS = 80 + +# +# Auxiliary functions +# + +def raw_input_block(): + """ Allows multiple line input until a Ctrl+D is detected. + + @rtype: string + """ + target = '' + while True: + try: + line = raw_input() + target += line + '\n' + except EOFError: + return target + +# +# This function was lifted from Bazaar 1.9. +# +def terminal_width(): + """Return estimated terminal width.""" + if sys.platform == 'win32': + return win32utils.get_console_size()[0] + width = DEFAULT_NUM_COLS + try: + import struct, fcntl, termios + s = struct.pack('HHHH', 0, 0, 0, 0) + x = fcntl.ioctl(1, termios.TIOCGWINSZ, s) + width = struct.unpack('HHHH', x)[1] + except IOError: + pass + if width <= 0: + try: + width = int(os.environ['COLUMNS']) + except: + pass + if width <= 0: + width = DEFAULT_NUM_COLS + + return width + +def launch_editor(initial_text, comment_from = '',comment_prefix = 'BUGZ:'): + """Launch an editor with some default text. + + Lifted from Mercurial 0.9. + @rtype: string + """ + (fd, name) = tempfile.mkstemp("bugz") + f = os.fdopen(fd, "w") + f.write(comment_from) + f.write(initial_text) + f.close() + + editor = (os.environ.get("BUGZ_EDITOR") or + os.environ.get("EDITOR")) + if editor: + result = os.system("%s \"%s\"" % (editor, name)) + if result != 0: + raise RuntimeError('Unable to launch editor: %s' % editor) + + new_text = open(name).read() + new_text = re.sub('(?m)^%s.*\n' % comment_prefix, '', new_text) + os.unlink(name) + return new_text + + return '' + +def block_edit(comment, comment_from = ''): + editor = (os.environ.get('BUGZ_EDITOR') or + os.environ.get('EDITOR')) + + if not editor: + print comment + ': (Press Ctrl+D to end)' + new_text = raw_input_block() + return new_text + + initial_text = '\n'.join(['BUGZ: %s'%line for line in comment.split('\n')]) + new_text = launch_editor(BUGZ_COMMENT_TEMPLATE % initial_text, comment_from) + + if new_text.strip(): + return new_text + else: + return '' + +# +# Bugz specific exceptions +# + +class BugzError(Exception): + pass + +class PrettyBugz(Bugz): + def __init__(self, base, user = None, password =None, forget = False, + columns = 0, encoding = '', skip_auth = False, + quiet = False, httpuser = None, httppassword = None ): + + self.quiet = quiet + self.columns = columns or terminal_width() + + Bugz.__init__(self, base, user, password, forget, skip_auth, httpuser, httppassword) + + self.log("Using %s " % self.base) + + if not encoding: + try: + self.enc = locale.getdefaultlocale()[1] + except: + self.enc = 'utf-8' + + if not self.enc: + self.enc = 'utf-8' + else: + self.enc = encoding + + def log(self, status_msg, newline = True): + if not self.quiet: + if newline: + print ' * %s' % status_msg + else: + print ' * %s' % status_msg, + + def warn(self, warn_msg): + if not self.quiet: + print ' ! Warning: %s' % warn_msg + + def get_input(self, prompt): + return raw_input(prompt) + + def search(self, **kwds): + """Performs a search on the bugzilla database with the keywords given on the title (or the body if specified). + """ + search_term = ' '.join(kwds['terms']).strip() + del kwds['terms'] + show_status = kwds['show_status'] + del kwds['show_status'] + show_url = kwds['show_url'] + del kwds['show_url'] + search_opts = sorted([(opt, val) for opt, val in kwds.items() + if val is not None and opt != 'order']) + + if not (search_term or search_opts): + raise BugzError('Please give search terms or options.') + + if search_term: + log_msg = 'Searching for \'%s\' ' % search_term + else: + log_msg = 'Searching for bugs ' + + if search_opts: + self.log(log_msg + 'with the following options:') + for opt, val in search_opts: + self.log(' %-20s = %s' % (opt, val)) + else: + self.log(log_msg) + + result = Bugz.search(self, search_term, **kwds) + + if result is None: + raise RuntimeError('Failed to perform search') + + if len(result) == 0: + self.log('No bugs found.') + return + + self.listbugs(result, show_url, show_status) + + def namedcmd(self, command, show_status=False, show_url=False): + """Run a command stored in Bugzilla by name.""" + log_msg = 'Running namedcmd \'%s\''%command + result = Bugz.namedcmd(self, command) + if result is None: + raise RuntimeError('Failed to run command\nWrong namedcmd perhaps?') + + if len(result) == 0: + self.log('No result from command') + return + + self.listbugs(result, show_url, show_status) + + def get(self, bugid, comments = True, attachments = True): + """ Fetch bug details given the bug id """ + self.log('Getting bug %s ..' % bugid) + + result = Bugz.get(self, bugid) + + if result is None: + raise RuntimeError('Bug %s not found' % bugid) + + # Print out all the fields below by extract the text + # directly from the tag, and just ignore if we don't + # see the tag. + FIELDS = ( + ('short_desc', 'Title'), + ('assigned_to', 'Assignee'), + ('creation_ts', 'Reported'), + ('delta_ts', 'Updated'), + ('bug_status', 'Status'), + ('resolution', 'Resolution'), + ('bug_file_loc', 'URL'), + ('bug_severity', 'Severity'), + ('priority', 'Priority'), + ('reporter', 'Reporter'), + ) + + MORE_FIELDS = ( + ('product', 'Product'), + ('component', 'Component'), + ('status_whiteboard', 'Whiteboard'), + ('keywords', 'Keywords'), + ) + + for field, name in FIELDS + MORE_FIELDS: + try: + value = result.find('.//%s' % field).text + if value is None: + continue + except AttributeError: + continue + print '%-12s: %s' % (name, value.encode(self.enc)) + + # Print out the cc'ed people + cced = result.findall('.//cc') + for cc in cced: + print '%-12s: %s' % ('CC', cc.text) + + # print out depends + dependson = ', '.join([d.text for d in result.findall('.//dependson')]) + blocked = ', '.join([d.text for d in result.findall('.//blocked')]) + if dependson: + print '%-12s: %s' % ('DependsOn', dependson) + if blocked: + print '%-12s: %s' % ('Blocked', blocked) + + bug_comments = result.findall('.//long_desc') + bug_attachments = result.findall('.//attachment') + + print '%-12s: %d' % ('Comments', len(bug_comments)) + print '%-12s: %d' % ('Attachments', len(bug_attachments)) + print + + if attachments: + for attachment in bug_attachments: + aid = attachment.find('.//attachid').text + desc = attachment.find('.//desc').text + when = attachment.find('.//date').text + print '[Attachment] [%s] [%s]' % (aid, desc.encode(self.enc)) + + if comments: + i = 0 + wrapper = textwrap.TextWrapper(width = self.columns) + for comment in bug_comments: + try: + who = comment.find('.//who').text.encode(self.enc) + except AttributeError: + # Novell doesn't use 'who' on xml + who = "" + when = comment.find('.//bug_when').text.encode(self.enc) + what = comment.find('.//thetext').text + print '\n[Comment #%d] %s : %s' % (i, who, when) + print '-' * (self.columns - 1) + + if what is None: + what = '' + + # print wrapped version + for line in what.split('\n'): + if len(line) < self.columns: + print line.encode(self.enc) + else: + for shortline in wrapper.wrap(line): + print shortline.encode(self.enc) + i += 1 + print + + def post(self, product = None, component = None, + title = None, description = None, assigned_to = None, + cc = None, url = None, keywords = None, + description_from = None, prodversion = None, append_command = None, + dependson = None, blocked = None, batch = False, + default_confirm = 'y', priority = None, severity = None): + """Post a new bug""" + + # load description from file if possible + if description_from: + try: + description = open(description_from, 'r').read() + except IOError, e: + raise BugzError('Unable to read from file: %s: %s' % \ + (description_from, e)) + + if not batch: + self.log('Press Ctrl+C at any time to abort.') + + # + # Check all bug fields. + # XXX: We use "if not <field>" for mandatory fields + # and "if <field> is None" for optional ones. + # + + # check for product + if not product: + while not product or len(product) < 1: + product = self.get_input('Enter product: ') + else: + self.log('Enter product: %s' % product) + + # check for component + if not component: + while not component or len(component) < 1: + component = self.get_input('Enter component: ') + else: + self.log('Enter component: %s' % component) + + # check for version + # FIXME: This default behaviour is not too nice. + if prodversion is None: + prodversion = self.get_input('Enter version (default: unspecified): ') + else: + self.log('Enter version: %s' % prodversion) + + # check for default severity + if severity is None: + severity_msg ='Enter severity (eg. normal) (optional): ' + severity = self.get_input(severity_msg) + else: + self.log('Enter severity (optional): %s' % severity) + + # fixme: hw platform + # fixme: os + # fixme: milestone + + # check for default priority + if priority is None: + priority_msg ='Enter priority (eg. Normal) (optional): ' + priority = self.get_input(priority_msg) + else: + self.log('Enter priority (optional): %s' % priority) + + # fixme: status + + # check for default assignee + if assigned_to is None: + assigned_msg ='Enter assignee (eg. liquidx@gentoo.org) (optional): ' + assigned_to = self.get_input(assigned_msg) + else: + self.log('Enter assignee (optional): %s' % assigned_to) + + # check for CC list + if cc is None: + cc_msg = 'Enter a CC list (comma separated) (optional): ' + cc = self.get_input(cc_msg) + else: + self.log('Enter a CC list (optional): %s' % cc) + + # check for optional URL + if url is None: + url = self.get_input('Enter URL (optional): ') + else: + self.log('Enter URL (optional): %s' % url) + + # check for title + if not title: + while not title or len(title) < 1: + title = self.get_input('Enter title: ') + else: + self.log('Enter title: %s' % title) + + # check for description + if not description: + description = block_edit('Enter bug description: ') + else: + self.log('Enter bug description: %s' % description) + + if append_command is None: + append_command = self.get_input('Append the output of the following command (leave blank for none): ') + else: + self.log('Append command (optional): %s' % append_command) + + # check for Keywords list + if keywords is None: + kwd_msg = 'Enter a Keywords list (comma separated) (optional): ' + keywords = self.get_input(kwd_msg) + else: + self.log('Enter a Keywords list (optional): %s' % keywords) + + # check for bug dependencies + if dependson is None: + dependson_msg = 'Enter a list of bug dependencies (comma separated) (optional): ' + dependson = self.get_input(dependson_msg) + else: + self.log('Enter a list of bug dependencies (optional): %s' % dependson) + + # check for blocker bugs + if blocked is None: + blocked_msg = 'Enter a list of blocker bugs (comma separated) (optional): ' + blocked = self.get_input(blocked_msg) + else: + self.log('Enter a list of blocker bugs (optional): %s' % blocked) + + # fixme: groups + # append the output from append_command to the description + if append_command is not None and append_command != '': + append_command_output = commands.getoutput(append_command) + description = description + '\n\n' + '$ ' + append_command + '\n' + append_command_output + + # raise an exception if mandatory fields are not specified. + if product is None: + raise RuntimeError('Product not specified') + if component is None: + raise RuntimeError('Component not specified') + if title is None: + raise RuntimeError('Title not specified') + if description is None: + raise RuntimeError('Description not specified') + + # set optional fields to their defaults if they are not set. + if prodversion is None: + prodversion = '' + if priority is None: + priority = '' + if severity is None: + severity = '' + if assigned_to is None: + assigned_to = '' + if cc is None: + cc = '' + if url is None: + url = '' + if keywords is None: + keywords = '' + if dependson is None: + dependson = '' + if blocked is None: + blocked = '' + + # print submission confirmation + print '-' * (self.columns - 1) + print 'Product : ' + product + print 'Component : ' + component + print 'Version : ' + prodversion + print 'severity : ' + severity + # fixme: hardware + # fixme: OS + # fixme: Milestone + print 'priority : ' + priority + # fixme: status + print 'Assigned to : ' + assigned_to + print 'CC : ' + cc + print 'URL : ' + url + print 'Title : ' + title + print 'Description : ' + description + print 'Keywords : ' + keywords + print 'Depends on : ' + dependson + print 'Blocks : ' + blocked + # fixme: groups + print '-' * (self.columns - 1) + + if not batch: + if default_confirm in ['Y','y']: + confirm = raw_input('Confirm bug submission (Y/n)? ') + else: + confirm = raw_input('Confirm bug submission (y/N)? ') + if len(confirm) < 1: + confirm = default_confirm + if confirm[0] not in ('y', 'Y'): + self.log('Submission aborted') + return + + result = Bugz.post(self, product, component, title, description, url, assigned_to, cc, keywords, prodversion, dependson, blocked, priority, severity) + if result is not None and result != 0: + self.log('Bug %d submitted' % result) + else: + raise RuntimeError('Failed to submit bug') + + def modify(self, bugid, **kwds): + """Modify an existing bug (eg. adding a comment or changing resolution.)""" + if 'comment_from' in kwds: + if kwds['comment_from']: + try: + kwds['comment'] = open(kwds['comment_from'], 'r').read() + except IOError, e: + raise BugzError('Failed to get read from file: %s: %s' % \ + (comment_from, e)) + + if 'comment_editor' in kwds: + if kwds['comment_editor']: + kwds['comment'] = block_edit('Enter comment:', kwds['comment']) + del kwds['comment_editor'] + + del kwds['comment_from'] + + if 'comment_editor' in kwds: + if kwds['comment_editor']: + kwds['comment'] = block_edit('Enter comment:') + del kwds['comment_editor'] + + if kwds['fixed']: + kwds['status'] = 'RESOLVED' + kwds['resolution'] = 'FIXED' + del kwds['fixed'] + + if kwds['invalid']: + kwds['status'] = 'RESOLVED' + kwds['resolution'] = 'INVALID' + del kwds['invalid'] + result = Bugz.modify(self, bugid, **kwds) + if not result: + raise RuntimeError('Failed to modify bug') + else: + self.log('Modified bug %s with the following fields:' % bugid) + for field, value in result: + self.log(' %-12s: %s' % (field, value)) + + def attachment(self, attachid, view = False): + """ Download or view an attachment given the id.""" + self.log('Getting attachment %s' % attachid) + + result = Bugz.attachment(self, attachid) + if not result: + raise RuntimeError('Unable to get attachment') + + action = {True:'Viewing', False:'Saving'} + self.log('%s attachment: "%s"' % (action[view], result['filename'])) + safe_filename = os.path.basename(re.sub(r'\.\.', '', + result['filename'])) + + if view: + print result['fd'].read() + else: + if os.path.exists(result['filename']): + raise RuntimeError('Filename already exists') + + open(safe_filename, 'wb').write(result['fd'].read()) + + def attach(self, bugid, filename, content_type = 'text/plain', patch = False, description = None): + """ Attach a file to a bug given a filename. """ + if not os.path.exists(filename): + raise BugzError('File not found: %s' % filename) + if not description: + description = block_edit('Enter description (optional)') + result = Bugz.attach(self, bugid, filename, description, filename, + content_type, patch) + if result == True: + self.log("'%s' has been attached to bug %s" % (filename, bugid)) + else: + reason = "" + if result and result != False: + reason = "\nreason: %s" % result + raise RuntimeError("Failed to attach '%s' to bug %s%s" % (filename, + bugid, reason)) + + def listbugs(self, buglist, show_url=False, show_status=False): + x = '' + if re.search("/$", self.base) is None: + x = '/' + for row in buglist: + bugid = row['bugid'] + if show_url: + bugid = '%s%s%s?id=%s'%(self.base, x, config.urls['show'], bugid) + status = row['status'] + desc = row['desc'] + line = '%s' % (bugid) + if show_status: + line = '%s %s' % (line, status) + if row.has_key('assignee'): # Novell does not have 'assignee' field + assignee = row['assignee'].split('@')[0] + line = '%s %-20s' % (line, assignee) + + line = '%s %s' % (line, desc) + + try: + print line.encode(self.enc)[:self.columns] + except UnicodeDecodeError: + print line[:self.columns] + + self.log("%i bug(s) found." % len(buglist)) |