Newer
Older
import mysql.connector
from get_docker_secret import get_docker_secret
import os
# Exception thrown when no password has been defined to connect to the database
class NoPasswordDefineError(Exception):
"""No secret containing a password has been found"""
pass
__rdos_password__ = None
# Procedures that returns an externally defined password to conect to the database
# First tries to check the docker secrets, then an environment variable.
# Throws NoPasswordDefineError if no password is found
# Returns a password
def get_password():
if __rdos_password__ == None:
password = get_docker_secret('rdos_secret')
if password == None:
if 'rdos_secret' in os.environ:
password = os.environ['rdos_secret']
print("Password stored in a non secured way, please make your code safer")
else:
raise NoPasswordDefineError
return password
return __rdos_password__
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# Connect to the local MYSQL database on lipn-rdos
def database_connection():
mydb = mysql.connector.connect(
host="192.168.90.101",
user="rdos",
password = get_password(),
database="rdos"
)
return mydb
def update_database(url:str, job_id: str, before:str, after:str):
database = database_connection()
mycursor = database.cursor(buffered=False)
with open('../database/templates/update_success.txt', 'r') as file:
update = file.read()
update = update.replace('{{before}}', before).replace('{{after}}', after)
update = update.replace('{{url}}', url)
update = update.replace('{{job_id}}', job_id)
print(update)
mycursor.execute(update)
database.commit()
print(mycursor.rowcount, "record(s) affected")
return mycursor.rowcount
# Get a row in the Job table that hasn't been computed yet.
def get_job(database):
sql_get_new_job = "SELECT jobs.id,parametersJSON,directory,email,tool,jobs.idGenerator,outputFormat,command from jobs inner join generators on jobs.idGenerator=generators.id where status='10'"
mycursor = database.cursor(buffered=True)
mycursor.execute(sql_get_new_job)
myresult = mycursor.fetchone()
mycursor.close()
return myresult
# Tells the database that the previously selected job will be taken care of by the deamon
# Returns 1 if no other deamon selected the job first, 0 otherwwise.
def apply_for_job(database, job_id: str):
mycursor = database.cursor(buffered=False)
update = "UPDATE jobs set status='100' where jobs.id='"+job_id+"' and status='10'"
mycursor.execute(update)
database.commit()
return mycursor.rowcount
# Returns a dictionary containing each generator's parameters as a key. The value associated to each key is "is this parameter's value written in the comand line or in a separated file
# in which case the value is the filename.
def getParameters(generators: dict):
database = database_connection()
parameters_basefile = {}
parameters_default = {}
for generator in generators:
parameters_basefile[generator] = {}
parameters_default[generator] = {}
sql_get_new_job = "SELECT name,basefile,defaultValue from parameters inner join generators on parameters.idGenerator=generators.id where generators.tool = '"+generator+"'"
mycursor = database.cursor(buffered=False)
mycursor.execute(sql_get_new_job)
myresult = mycursor.fetchall()
mycursor.close()
for result in myresult:
parameters_basefile[generator][result[0]] = result[1]
parameters_default[generator][result[0]] = result[1]
database.disconnect()
return (parameters_basefile, parameters_default)
# Returns a dictionary containing all database generators existing in database
# The result returned contains every generators in database with parameters and default values for each field
def get_generators():
database = database_connection()
generators = {}
sql_get_new_job = "SELECT generators.tool, name, basefile, defaultValue from parameters inner join generators on parameters.idGenerator=generators.id"
mycursor = database.cursor(buffered=False)
myresult = mycursor.fetchall()
mycursor.close()
for result in myresult:
# Insert in database a query send from client and returns insert status
def db_insert(query: dict):
if(query is not None):
database = database_connection()
if(db_job_check(query) != 0):
query['id'] = str(random.getrandbits(64))
try:
values = query.values()
cols = query.keys()
sql_new_job_row = "INSERT INTO jobs (%s) VALUES ('%s');" % (", ".join(cols), "','".join(values))
mycursor = database.cursor(buffered=True)
mycursor.execute(sql_new_job_row)
database.commit()
mycursor.close()
except mysql.connector.Error as err:
print("MYSQL Error :{}".format(err))
def db_job_check(query: dict):
if (query is not None):
db = database_connection()
try:
ret = 0
val = query['id']
sql_new_job_row = "SELECT * FROM jobs WHERE ID=('%s') LIMIT 1;" % (val)
mycursor = db.cursor(buffered=True)
mycursor.execute(sql_new_job_row)
db.commit()
ret = mycursor.rowcount
mycursor.close()
return ret
except mysql.connector.Error as err:
print("MYSQL Error :{}".format(err))