Newer
Older
import mysql.connector
from get_docker_secret import get_docker_secret
import os
# Exception thrown when no password has been defined to connect to the database
class NoPasswordDefineError(Exception):
"""No secret containing a password has been found"""
pass
__rdos_password__ = None
# Procedures that returns an externally defined password to conect to the database
# First tries to check the docker secrets, then an environment variable.
# Throws NoPasswordDefineError if no password is found
# Returns a password
def get_password():
if __rdos_password__ == None:
password = get_docker_secret('rdos_secret')
if password == None:
if 'rdos_secret' in os.environ:
password = os.environ['rdos_secret']
print("Password stored in a non secured way, please make your code safer")
else:
raise NoPasswordDefineError
return password
return __rdos_password__
# Connect to the local MYSQL database on lipn-rdos
def database_connection():
mydb = mysql.connector.connect(
host="192.168.90.101",
user="rdos",
password = get_password(),
database="rdos"
)
return mydb
def update_database(url:str, job_id: str, before:str, after:str):
database = database_connection()
mycursor = database.cursor(buffered=False)
with open('../database/templates/update_success.txt', 'r') as file:
update = file.read()
update = update.replace('{{before}}', before).replace('{{after}}', after)
update = update.replace('{{url}}', url)
update = update.replace('{{job_id}}', job_id)
print(update)
Bonjour monsieur @Pierre Boudes , s il vous plait est ce qu'il n y'a pas de nouvelles par rapport à nos résultats? et si vous pouvez nous donner une idée sur les attestations de réussite quand est ce qu'on pourra les avoir? ( les gens qui ont déjà passé leurs soutenances) it()
print(mycursor.rowcount, "record(s) affected")
return mycursor.rowcount
# Get a row in the Job table that hasn't been computed yet.
def get_job(database):
sql_get_new_job = "SELECT jobs.id,parametersJSON,directory,email,tool,jobs.idGenerator,outputFormat,command from jobs inner join generators on jobs.idGenerator=generators.id where status='10'"
mycursor = database.cursor(buffered=True)
mycursor.execute(sql_get_new_job)
myresult = mycursor.fetchone()
mycursor.close()
return myresult
# Tells the database that the previously selected job will be taken care of by the deamon
# Returns 1 if no other deamon selected the job first, 0 otherwwise.
def apply_for_job(database, job_id: str):
mycursor = database.cursor(buffered=False)
update = "UPDATE jobs set status='100' where jobs.id='"+job_id+"' and status='10'"
mycursor.execute(update)
database.commit()
return mycursor.rowcount
# Returns a dictionary containing each generator's parameters as a key. The value associated to each key is "is this parameter's value written in the comand line or in a separated file
# in which case the value is the filename.
def getParameters(generators: dict):
database = database_connection()
parameters_basefile = {}
parameters_default = {}
for generator in generators:
parameters_basefile[generator] = {}
parameters_default[generator] = {}
sql_get_new_job = "SELECT name,basefile,defaultValue from parameters inner join generators on parameters.idGenerator=generators.id where generators.tool = '"+generator+"'"
mycursor = database.cursor(buffered=False)
mycursor.execute(sql_get_new_job)
myresult = mycursor.fetchall()
mycursor.close()
for result in myresult:
parameters_basefile[generator][result[0]] = result[1]
parameters_default[generator][result[0]] = result[1]
database.disconnect()
return (parameters_basefile, parameters_default)
# Returns a dictionary containing all database generators existing in database
# The result returned contains every generators in database with parameters and default values for each field
def get_generators():
database = database_connection()
generators = {}
sql_get_new_job = "SELECT generators.tool, name, basefile, defaultValue from parameters inner join generators on parameters.idGenerator=generators.id"
mycursor = database.cursor(buffered=False)
myresult = mycursor.fetchall()
mycursor.close()
for result in myresult:
# Insert in database a query send from client and returns insert status
def db_insert(query: dict):
if(query is not None):
database = database_connection()
if(db_job_check(query) != 0):
query['id'] = str(random.getrandbits(64))
try:
values = query.values()
cols = query.keys()
sql_new_job_row = "INSERT INTO jobs (%s) VALUES ('%s');" % (", ".join(cols), "','".join(values))
mycursor = database.cursor(buffered=True)
mycursor.execute(sql_new_job_row)
database.commit()
mycursor.close()
except mysql.connector.Error as err:
print("MYSQL Error :{}".format(err))
def db_job_check(query: dict):
if (query is not None):
db = database_connection()
try:
ret = 0
val = query['id']
sql_new_job_row = "SELECT * FROM jobs WHERE ID=('%s') LIMIT 1;" % (val)
mycursor = db.cursor(buffered=True)
mycursor.execute(sql_new_job_row)
db.commit()
ret = mycursor.rowcount
mycursor.close()
return ret
except mysql.connector.Error as err:
print("MYSQL Error :{}".format(err))