/usr/lib/python3/dist-packages/skytools/adminscript.py is in python3-skytools 3.3-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | """Admin scripting.
"""
# allow getargspec
# pylint:disable=deprecated-method
from __future__ import division, absolute_import, print_function
import sys
import inspect
import skytools
__all__ = ['AdminScript']
class AdminScript(skytools.DBScript):
"""Contains common admin script tools.
Second argument (first is .ini file) is taken as command
name. If class method 'cmd_' + arg exists, it is called,
otherwise error is given.
"""
commands_without_pidfile = {}
def __init__(self, service_name, args):
"""AdminScript init."""
super(AdminScript, self).__init__(service_name, args)
if len(self.args) < 2:
self.log.error("need command")
sys.exit(1)
cmd = self.args[1]
if cmd in self.commands_without_pidfile:
self.pidfile = None
if self.pidfile:
self.pidfile = self.pidfile + ".admin"
def work(self):
"""Non-looping work function, calls command function."""
self.set_single_loop(1)
cmd = self.args[1]
cmdargs = self.args[2:]
# find function
fname = "cmd_" + cmd.replace('-', '_')
if not hasattr(self, fname):
self.log.error('bad subcommand, see --help for usage')
sys.exit(1)
fn = getattr(self, fname)
# check if correct number of arguments
(args, varargs, ___varkw, ___defaults) = inspect.getargspec(fn)
n_args = len(args) - 1 # drop 'self'
if varargs is None and n_args != len(cmdargs):
helpstr = ""
if n_args:
helpstr = ": " + " ".join(args[1:])
self.log.error("command '%s' got %d args, but expects %d%s",
cmd, len(cmdargs), n_args, helpstr)
sys.exit(1)
# run command
fn(*cmdargs)
def fetch_list(self, db, sql, args, keycol=None):
"""Fetch a resultset from db, optionally turning it into value list."""
curs = db.cursor()
curs.execute(sql, args)
rows = curs.fetchall()
db.commit()
if not keycol:
res = rows
else:
res = [r[keycol] for r in rows]
return res
def display_table(self, db, desc, sql, args=(), fields=(), fieldfmt=None):
"""Display multirow query as a table."""
self.log.debug("display_table: %s", skytools.quote_statement(sql, args))
curs = db.cursor()
curs.execute(sql, args)
rows = curs.fetchall()
db.commit()
if len(rows) == 0:
return 0
if not fieldfmt:
fieldfmt = {}
if not fields:
fields = [f[0] for f in curs.description]
widths = [15] * len(fields)
for row in rows:
for i, k in enumerate(fields):
rlen = row[k] and len(str(row[k])) or 0
widths[i] = widths[i] > rlen and widths[i] or rlen
widths = [w + 2 for w in widths]
fmt = '%%-%ds' * (len(widths) - 1) + '%%s'
fmt = fmt % tuple(widths[:-1])
if desc:
print(desc)
print(fmt % tuple(fields))
print(fmt % tuple(['-' * (w - 2) for w in widths]))
#print(fmt % tuple(['-'*15] * len(fields)))
for row in rows:
vals = []
for field in fields:
val = row[field]
if field in fieldfmt:
val = fieldfmt[field](val)
vals.append(val)
print(fmt % tuple(vals))
print('\n')
return 1
def exec_stmt(self, db, sql, args):
"""Run regular non-query SQL on db."""
self.log.debug("exec_stmt: %s", skytools.quote_statement(sql, args))
curs = db.cursor()
curs.execute(sql, args)
db.commit()
def exec_query(self, db, sql, args):
"""Run regular query SQL on db."""
self.log.debug("exec_query: %s", skytools.quote_statement(sql, args))
curs = db.cursor()
curs.execute(sql, args)
res = curs.fetchall()
db.commit()
return res
|