'''
Created on 1 oct. 2011
@author: abderrazek boufahja
'''
from xml.dom import minidom
from difflib import ndiff
import re
import os
import sys
import psycopg2
import datetime
SENDMAIL = '/usr/sbin/sendmail' # sendmail location
NOTIFICATION_BY_MAIL = True # True or False
CALCULATE_DIFFERENCE = False # True or False
LIST_ADMIN_MAIL = ['abderrazek.boufahja@gmail.com']
SENDER = 'epsos-code-updater@irisa.fr'
DB_TSAM_NAME = 'tsam'
DB_USER = 'gazelle'
FOLDER_RESULT = '/home/aboufahj/workspace/pivot-code-gen/svsepsos'
##
#
# these are list of DTO Class used on this script
#
##
class CodeSystem:
def __init__(self):
self.id = 0
self.oid = ''
self.name = ''
self.description = ''
self.listValueSet = []
self.listCodeSystemVersion = []
def __str__(self):
return '(' + str(self.id) + ',' + self.oid + ',' + self.name + ',' + self.description + ')'
def _getAsXML(self):
ss = '\n'
for valset in self.listValueSet:
ss = ss + valset._getAsXML()
ss = ss + '\n'
return ss
class CodeSystemConcept:
def __init__(self):
self.id = 0;
self.code = ''
self.status = ''
self.statusDate = ''
self.definition = ''
self.codeSystemVersion = None
self.listDesignation = []
class CodeSystemVersion:
def __init__(self):
self.id = 0
self.fullName = ''
self.localName = ''
self.previousVersionId = 0
self.effectiveDate = ''
self.releaseDate = ''
self.status = ''
self.statusDate = ''
self.description = ''
self.copyright = ''
self.source = ''
self.codeSystem = None
self.listCodeSystemConcept = []
class Designation:
def __init__(self):
self.id = 0
self.designation = ''
self.languageCode = ''
self.type = ''
self.isPreffered = False
self.status = ''
self.statusDate = ''
self.codeSystemConcept = None
def _getAsXML(self):
ss = '\n'
return ss
class TranscodingAssociation:
def __init__(self):
self.id = 0
self.targetConcept = None
self.sourceConcept = None
self.quality = ''
self.status = ''
self.statusDate =''
class ValueSet:
def __init__(self):
self.id = 0
self.oid = ''
self.epsosName = ''
self.description = ''
self.parentCodeSystem = None
self.listValueSetVersion = []
def _getAsXML(self):
ss = '\n'
for valsetVersion in self.listValueSetVersion:
for csc in valsetVersion.listCodeSystemConcept:
for des in csc.listDesignation:
ss = ss + des._getAsXML()
ss = ss + '\n'
return ss
class ValueSetVersion:
def __init__(self):
self.id = 0
self.versionName = ''
self.effectiveDate = ''
self.releaseDate = ''
self.status = ''
self.statusDate = ''
self.description = ''
self.previousVersion = None
self.valueSet = None
self.listCodeSystemConcept = []
##
#
# XMLDAO class used for XML parsing of the original epsos-code.xml file
#
##
class XMLDAO:
def __init__(self):
pass
def getListCodeSystem(self, epsosFilePath):
self.dom = minidom.parse(epsosFilePath)
self.root = self._getNode(self.dom,'Codes')
result = self.root.getElementsByTagName('CodeSystem')
return result
def getListValueSetFromCodeSystem(self, codeSystemNode):
return codeSystemNode.getElementsByTagName('ValueSet')
def _getNode(self, parent_node, name):
try:
return parent_node.getElementsByTagName(name)[0]
except:
return None
##
#
# DBDAO used to access to tsam database and load objects from this db
#
##
class DBDAO:
def __init__(self):
pass
def connect(self):
self.conn = psycopg2.connect('dbname=' + DB_TSAM_NAME + ' user=' + DB_USER)
self.cur = self.conn.cursor()
def loadListCodeSystem(self):
self.connect()
self.cur.execute("SELECT DISTINCT * FROM code_system;")
x=self.cur.fetchone()
res = []
while x:
toAdd = CodeSystem()
toAdd.id = x[0]
toAdd.oid = x[1]
toAdd.name = x[2]
toAdd.description = x[3]
res.append(toAdd)
toAdd.listValueSet = self.loadListValueSetForCodeSystem(toAdd)
x=self.cur.fetchone()
res = sorted(res, key=lambda cs: cs.oid)
return res
def loadListValueSetForCodeSystem(self, toAdd):
res = []
if toAdd.id != 0:
conn = psycopg2.connect('dbname=' + DB_TSAM_NAME +' user=' + DB_USER)
cur = conn.cursor()
cur.execute('SELECT DISTINCT * FROM value_set WHERE parent_code_system_id=' + str(toAdd.id) + ';')
x=cur.fetchone()
while x:
valset = ValueSet()
valset.id = x[0]
valset.oid = x[1]
valset.epsosName = x[2]
valset.description = x[3]
if valset.oid != None:
valset.parentCodeSystem = toAdd
res.append(valset)
valset.listValueSetVersion = self.loadListValueSetVersionForValueSet(valset)
x=cur.fetchone()
res = sorted(res, key=lambda cs: cs.oid)
return res
def loadListValueSetVersionForValueSet(self, valset):
res = []
if (valset.id != 0):
conn = psycopg2.connect('dbname=' + DB_TSAM_NAME +' user=' + DB_USER)
cur = conn.cursor()
cur.execute('SELECT DISTINCT * FROM value_set_version WHERE value_set_id=' + str(valset.id) + ' AND value_set_version.status=\'Current\';')
x=cur.fetchone()
while x:
valsetver = ValueSetVersion()
valsetver.id = x[0]
valsetver.versionName = x[1]
valsetver.effectiveDate = x[2]
valsetver.status = x[4]
valsetver.valueSet = valset
valsetver.listCodeSystemConcept = self.loadListCodeSystemConceptForValueSetVersion(valsetver)
res.append(valsetver)
x=cur.fetchone()
return res
def loadListCodeSystemConceptForValueSetVersion(self, valsetver):
res = []
if (valsetver.id != 0):
conn = psycopg2.connect('dbname=' + DB_TSAM_NAME +' user=' + DB_USER)
cur = conn.cursor()
# cur.execute('SELECT DISTINCT * FROM code_system_concept JOIN code_system_version ON code_system_concept.code_system_version_id=code_system_version.id WHERE code_system_concept.id IN (SELECT code_system_concept_id from x_concept_value_set WHERE x_concept_value_set.value_set_version_id=' + str(valsetver.id) +
# ') AND code_system_concept.status=\'Current\'AND code_system_version.status=\'Current\' AND code_system_version.code_system_id=' +
# str(valsetver.valueSet.parentCodeSystem.id) +' ;')
cur.execute('SELECT DISTINCT * FROM code_system_concept WHERE code_system_concept.id IN (SELECT code_system_concept_id from x_concept_value_set WHERE x_concept_value_set.value_set_version_id=' + str(valsetver.id) +
') AND code_system_concept.status=\'Current\' ;')
x=cur.fetchone()
while x:
csc = CodeSystemConcept()
csc.id = x[0]
csc.code = x[1]
csc.status = x[2]
csc.statusDate = x[3]
csc.definition = x[4]
csc.listDesignation = self.loadListDesignationForCodeSystemConcept(csc)
res.append(csc)
x=cur.fetchone()
return sorted(res, key=lambda csc: csc.code)
def loadListDesignationForCodeSystemConcept(self, csc):
res = []
if csc.id != 0:
conn = psycopg2.connect('dbname=' + DB_TSAM_NAME +' user=' + DB_USER)
cur = conn.cursor()
cur.execute('SELECT DISTINCT * FROM designation WHERE (code_system_concept_id=' + str(csc.id) + ' and designation.status=\'Current\' and designation.language_code=\'en\') ;')
x=cur.fetchone()
while x:
des = Designation()
des.id = x[0]
des.designation = x[1]
des.languageCode = x[2]
des.type = x[3]
des.isPreffered = x[4]
des.status = x[5]
des.statusDate = x[6]
des.codeSystemConcept = csc
res.append(des)
x=cur.fetchone()
return res
def getListCodeSystemAsFullXML(self, listCodeSystem):
ss = '\n'
ss = ss + ' '
for cs in listCodeSystem:
ss = ss + cs._getAsXML()
ss = ss + self._getLanguageStringCodeAsXML()
ss = ss + ''
dom = minidom.parseString(ss.encode('utf-8'))
res = dom.toprettyxml()
return res
def _getLanguageStringCodeAsXML(self):
return '''
'''
def generateListRetrieveValueSet(self, folderpath):
res = ''
fio = FileIOExtra()
news = False
listCS = self.loadListCodeSystem()
for cs in listCS:
for valset in cs.listValueSet:
res = ''''''
res = res + ''
res = res + ''
for valsetVersion in valset.listValueSetVersion:
for csc in valsetVersion.listCodeSystemConcept:
for des in csc.listDesignation:
res = res + ''
res = res + ''
res = self._getAsPrettyXML(res)
fio.saveXMLFile(res, folderpath + '/' + valset.oid + '.xml')
def _getversion(self):
return str(datetime.datetime.today().year) + str(datetime.datetime.today().month) + str( datetime.datetime.today().day)
def _getAsPrettyXML(self, ss):
dom = minidom.parseString(ss.encode('utf-8'))
res = dom.toprettyxml()
return res
##
#
# FileIOExtra Class is used to read and write from a text file
#
##
class FileIOExtra:
def __init__(self):
pass
def saveXMLFile(self, content, fileName):
fich = open(fileName,'w')
fich.write(content.encode('utf-8'))
fich.close()
def getOriginalFile(self, fileName):
res = ''
if os.path.exists(fileName):
fich = open(fileName,'r')
res = fich.read()
fich.close()
return res
if __name__ == '__main__':
reload(sys)
sys.setdefaultencoding("utf-8")
db = DBDAO()
print '' + str(datetime.datetime.today().year) + str(datetime.datetime.today().month) + str( datetime.datetime.today().day)
db.generateListRetrieveValueSet(FOLDER_RESULT)