# set an "active filter" (a where), then call e.g. nuclides.half_life, and the select is built with that filter:
# nuclides.half_life() is a function that calls
# buildquery( nuclides.half_life , active filter)
# it should remember whether is has been already called with that filter called
#%config IPCompleter.greedy = True
import os.path
import ndlaborm
import sqlite3
[docs]class Dblink:
""" trt
"""
def __init__(self, db_path = ''):
self._sqlbuilder = ndlaborm.Sqlbuilder()
if(os.path.exists(db_path) ):
self._con_lite = sqlite3.connect(db_path)
self.connected = True
else:
self._con_lite = None
self.connected = False
print("db path not found : " + os.path.abspath(os.path.join(db_path)))
self.lastsql = ''
def force_clean_query(self,force):
self._sqlbuilder.force_clean = force
def query_exec(self,sql):
return self._con_lite.execute(sql)
def _query_exec(self,fields, conditions=""):
try:
return self._con_lite.execute(self.query_build(fields, conditions))
except:
# print('query error')
try:
if(not self._sqlbuilder.force_clean):
self.force_clean_query(True)
sql = self.query_build(fields, conditions)
self.force_clean_query(False)
return self._con_lite.execute(sql)
except:
return None
return None
def query_desc(self,fields, conditions=""):
return self._sqlbuilder.query_desc(fields,conditions)
def query_check(self,table, conditions=""):
return self._sqlbuilder.query_check(table, conditions)
def query_build(self,fields, conditions=""):
self.lastsql = self._sqlbuilder.query_build(fields,conditions)
return self.lastsql
def _result_keys(self, result):
return [key[0] for key in result.description]
def json_build(self, tablename , filter = ''):
return self._json_build(self._query_exec(tablename , filter ))
def _json_build(self, result):
if(not result): return result
keys = self._result_keys(result)
field_name = []
for k in keys:
field_name.append( k + "\""+ ":\"" )
jss = []
i_range = range(len(keys))
for r in result:
tkks = ((( " {\"" if i == 0 else " ,\"") + field_name[i] + str(r[i]) + "\"" ) for i in i_range )
jss.append(" ".join(tkks) + "}")
return "[" + ",".join(jss) + "]"
def csv_build(self,tablename , filter = ''):
return self._csv_build(self._query_exec(tablename, filter))
def _csv_build(self, result, sep=","):
if(not result): return result
keys = self._result_keys(result)
jss = [sep.join(keys)]
i_range = range(len(keys))
for r in result:
jss.append(sep.join(( str(r[i]) if str(r[i]) != 'None' else '')for i in i_range) )
return "\n".join(jss)
def data_deliverer(self, return_type, fields, condition):
res = "res"
if(return_type == 'csv'):
res = self.csv_build(fields, condition)
if(res == None):
return 'message \n error'
if( return_type == 'json'):
res = self.json_build(fields, condition)
if(res == None):
return '{"message":"error"}'
return res