#!/usr/bin/python # Copyright (c) 2009 by Reid Priedhorsky, . # # This script is distributed under the terms of the GNU General Public # License; see http://www.gnu.org/licenses/gpl.txt for more information. usage = ''' Usage: %prog ACTION ARGS All capacities are in milliamp-hours (mAh). ''' actions_usage = { 'list-batts': ''' Print the known batteries. Fields are: id, type, specified capacity, capacity at most recent test, and tested capacity as a percent of specified capacity. ''', 'list-groups': ''' Print the known battery groups. ''', 'list-tests': '''[BATT_ID] Print the capacity test results for battery BATT_ID, or for all batteries if BATT_ID is omitted. ''', 'add-batt': '''TYPE MAH_SPEC [BATT_ID] Create a new battery of type TYPE (string) having specification capacity MAH_SPEC. If BATT_ID is given, use that ID (int); otherwise, auto-generate an ID. ''', 'add-test': '''BATT_ID MAH_RESULT [DATE] [REMARKS] Record a capacity test of battery BATT_ID. If no date given, use today. ''', 'add-group': '''SIZE TYPE [GROUP_ID] Create a new group of SIZE batteries, automatically choosing the best match from unmatched batteries. If GROUP_ID is given, use that ID. ''', 'rm-batt': '''BATT_ID FIXME ''', 'rm-test': '''BATT_ID TEST_ID FIXME ''', 'rm-group': '''GROUP_ID Delete the group GROUP_ID. ''', } import os.path import psycopg2 import psycopg2.extensions import sys DB_CONNECT_STRING = 'dbname=batteries' class Batt_Man(object): __slots__ = ('db', 'action') def main(self): self.action = (sys.argv[1] if (len(sys.argv) >= 2) else '') args = sys.argv[2:] # dispatch action method = getattr(self, 'action_' + self.action.replace('-', '_'), None) if (method is None): self.usage_error('Unknown action: %s' % (self.action)) self.db = DB() method(args) self.db.commit() def action_add_batt(self, args): (type_, mah_spec, batt_id) = self.parse_args(args, 2, 3, (str, float, int)) if (batt_id is None): batt_id = self.db.sql("""SELECT coalesce(max(id)+1, 1) AS m FROM battery""")[0]['m'] self.db.insert('battery', {'id': batt_id}, {'type_': type_, 'mah_spec': mah_spec, 'retired': False}) print 'Created new battery %d: %s, %g mAh' % (batt_id, type_, mah_spec) def action_add_group(self, args): (count, type_, group_id) = self.parse_args(args, 2, 3, (int, str, int)) if (group_id is None): group_id = self.db.sql("""SELECT coalesce(max(id)+1, 1) AS m FROM group_""")[0]['m'] bids = [row['id'] for row in self.db.sql(""" SELECT id FROM battery_status WHERE type_ = %s AND NOT EXISTS (SELECT battery_id FROM group_battery WHERE battery_id = id) ORDER BY mah_result desc LIMIT %s""", (type_, count))] if (len(bids) != count): self.usage_error('Not enough batteries of type %s: found %d, need %d' % (type_, len(bids), count)) self.db.insert('group_', {'id': group_id}, {'count': count}) for bid in bids: self.db.insert('group_battery', {'group_id': group_id, 'battery_id': bid}, {}) print 'Created new group %d: %d x %s' % (group_id, count, type_) def action_add_test(self, args): (batt_id, mah_result, date, remarks) \ = self.parse_args(args, 2, 4, (int, float, str, str)) if (date is None): date = 'today' try: date = self.db.sql('SELECT %s::date AS date', (date,))[0]['date'] except psycopg2.DataError: self.usage_error("can't parse date: %s" % (date)) self.db.insert('test', { 'batt_id': batt_id, 'date_': date }, { 'mah_result': mah_result, 'remarks': remarks }) print ('Recorded test for battery %d: %s, %g mAh%s' % (batt_id, date, mah_result, ('' if remarks is None else ', "%s"' % (remarks)))) def action_list_batts(self, args): self.parse_args(args, 0, 0, ()) rows = self.db.sql(""" SELECT id, type_, group_id, COALESCE(group_id::text, '') AS group_id_txt, mah_spec, mah_result, pct_spec FROM battery_status WHERE NOT retired ORDER BY mah_spec, type_, group_id, id""") print print ' Capacity (mAh)' print 'ID Gr Type spec test %sp' print '-- -- ------------------ ---- ---- ---' for row in rows: print '%(id)2d %(group_id_txt)2s %(type_)-18s %(mah_spec)4g %(mah_result)4g %(pct_spec)3.0f' % row print def action_list_groups(self, args): self.parse_args(args, 0, 0, ()) # FIXME: do this with SQL aggregates and GROUP BY? rows = self.db.sql(""" SELECT b.id AS bid, g.id AS gid, type_, mah_spec, mah_result, pct_spec FROM battery_status b JOIN group_battery gb ON b.id = gb.battery_id JOIN group_ g ON g.id = gb.group_id WHERE NOT retired ORDER BY type_, g.id, mah_result DESC""") print print ' Capacity (mAh/cell)' print 'Gr Type Cells spec test %sp %err' print '-- --------------------- ------------ ---- ---- --- ----' cells = [] for i in range(len(rows)): cells.append(rows[i]['bid']) if (i == len(rows)-1 or rows[i]['gid'] != rows[i+1]['gid']): rows[i]['type_'] += ' x' + str(len(cells)) rows[i]['cells'] = ",".join([str(c) for c in sorted(cells)]) rows[i]['errpct'] = 100 * self.group_error(rows[i]['gid']) print '%(gid)2d %(type_)-21s %(cells)-12s %(mah_spec)4g %(mah_result)4g %(pct_spec)3.0f %(errpct)4.1f' % rows[i] cells = [] def action_list_tests(self, args): (batt_id,) = self.parse_args(args, 0, 1, (int,)) where = (('battery.id = %d' % (batt_id)) if batt_id is not None else '1=1') rows = self.db.sql(""" SELECT battery.id, type_, group_id, COALESCE(group_id::text, '') AS group_id_txt, mah_spec, date_, mah_result, 100 * mah_result/mah_spec AS pct_spec, COALESCE(remarks, '') as remarks FROM battery JOIN test ON (battery.id = batt_id) LEFT OUTER JOIN group_battery bg ON (battery.id = bg.battery_id) WHERE NOT retired AND %s ORDER BY mah_spec, type_, group_id, id, date_, test.id""" % (where)) print print ' Capacity (mAh) ------------' print 'ID Gr Type spec date test %sp remarks' print '-- -- ------------------ ---- ---------- ---- --- --------------' for i in range(len(rows)): if (i == 0 or rows[i]['id'] != rows[i-1]['id']): print print '%(id)2d %(group_id_txt)2s %(type_)-18s %(mah_spec)4g %(date_)s %(mah_result)4g %(pct_spec)3.0f %(remarks)s' % rows[i] print def action_rm_group(self, args): (group_id,) = self.parse_args(args, 1, 1, (int,)) self.db.sql("DELETE FROM group_battery WHERE group_id = %s", (group_id,)) self.db.sql("DELETE FROM group_ WHERE id = %s", (group_id,)) print "Group %d deleted, but no error checking" % (group_id) def group_error(self, gid): '''Return the maching error of group gid (i.e., the normalized difference between the maximum and minimum tested mAh capacities of cells within the group.''' rows = self.db.sql(""" SELECT (max(mah_result) - min(mah_result)) / min(mah_result) AS err FROM battery_status WHERE group_id = %s""", (gid,)) return rows[0]['err'] def parse_args(self, args, count_min, count_max, types): if (len(args) > count_max): self.usage_error('too many arguments') if (len(args) < count_min): self.usage_error('too few arguments') result = [None] * count_max for i in range(count_max): if (i >= len(args)): break try: result[i] = types[i](args[i]) except ValueError, x: self.usage_error('Wrong type argument: %s' % (x)) return result def usage_error(self, error=None): if (error is not None): print '\nError:', error print usage.replace('%prog', os.path.basename(sys.argv[0])) if (self.action in actions_usage): print '* %s %s' % (self.action, actions_usage[self.action]) else: print 'Available actions:\n' for action in sorted(actions_usage.keys()): print '* %s %s' % (action, actions_usage[action]) sys.exit(1) class Usage_Error(Exception): pass class DB(object): __slots__ = ('conn', 'curs') def __init__(self): self.conn = psycopg2.connect(DB_CONNECT_STRING) self.conn.set_isolation_level(1) # plain READ COMMITTED self.curs = self.conn.cursor() def commit(self): # FIXME: Do something smart if commit fails. self.conn.commit() def delete(self, table, id_cols): self.sql('DELETE FROM %s WHERE %s' % (table, ' AND '.join(map(lambda x: '%s = %%(%s)s' % (x, x), id_cols.keys()))), id_cols) def dict_prep(self, dict_): 'Prepare the values in dict_ for storage.' for key in dict_.keys(): # Empty string becomes None if (dict_[key] == ''): dict_[key] = None def insert(self, table, id_cols, nonid_cols): # ID columns with value None are assumed to have a useful default # value specified in the database, so don't mention them. all_cols = dict() all_cols.update(id_cols) all_cols.update(nonid_cols) self.dict_prep(all_cols) insert_cols = dict() insert_cols.update(nonid_cols) for (key, value) in id_cols.items(): if (value is not None): insert_cols[key] = value sql = 'INSERT INTO %s (%s) VALUES (%s)' \ % (table, ', '.join(insert_cols.keys()), ', '.join(map(lambda x: '%%(%s)s' % x, insert_cols.keys()))) self.sql(sql, all_cols) def insert_clobber(self, table, id_cols, nonid_cols): '''Like insert(), but clobber any existing row with matching PK. Data from any existing row is NOT merged with the new data.''' self.delete(table, id_cols) self.insert(table, id_cols, nonid_cols) def lock_table(self, table, mode, blocking): if (blocking): nowait = '' else: nowait = 'nowait' self.sql('LOCK TABLE %s IN %s MODE %s' % (table, mode, nowait)) def pack_result(self, desc, result): 'Return the result set as a list of col_name => col_value dicts.' fres = [] for row in result: frow = {} for i in range(len(row)): frow[desc[i][0]] = row[i] fres.append(frow) return fres def quoted(self, s): if (s is None): return "''" return str(psycopg2.extensions.QuotedString(s)) def rollback(self): self.conn.rollback() def rowcount(self): return self.curs.rowcount def sequence_get_next(self, seq_name): return self.sql("SELECT nextval('%s');" % (seq_name))[0]['nextval'] def sql(self, sql, parms=None): #print sql self.curs.execute(sql, parms) if (self.curs.description is None): return None else: return self.pack_result(self.curs.description, self.curs.fetchall()) def transaction_begin(self): # Apparently this is not necessary #self.sql("BEGIN TRANSACTION"); self.sql('SET CONSTRAINTS ALL DEFERRED') self.sql('SET TRANSACTION ISOLATION LEVEL SERIALIZABLE') #class MyOptionParser(optparse.OptionParser): # # def format_help(self): # return self.usage.replace('%prog', self.get_prog_name()) if (__name__ == '__main__'): b = Batt_Man() b.main()