''' Created on 1 oct. 2011 @author: abderrazek boufahja ''' from xml.dom import minidom from difflib import ndiff import re import os import sys import psycopg2 import datetime SENDMAIL = '/usr/sbin/sendmail' # sendmail location NOTIFICATION_BY_MAIL = True # True or False CALCULATE_DIFFERENCE = False # True or False LIST_ADMIN_MAIL = ['abderrazek.boufahja@gmail.com'] SENDER = 'epsos-code-updater@irisa.fr' DB_TSAM_NAME = 'tsam' DB_USER = 'gazelle' FOLDER_RESULT = '/home/aboufahj/workspace/pivot-code-gen/svsepsos' ## # # these are list of DTO Class used on this script # ## class CodeSystem: def __init__(self): self.id = 0 self.oid = '' self.name = '' self.description = '' self.listValueSet = [] self.listCodeSystemVersion = [] def __str__(self): return '(' + str(self.id) + ',' + self.oid + ',' + self.name + ',' + self.description + ')' def _getAsXML(self): ss = '\n' for valset in self.listValueSet: ss = ss + valset._getAsXML() ss = ss + '\n' return ss class CodeSystemConcept: def __init__(self): self.id = 0; self.code = '' self.status = '' self.statusDate = '' self.definition = '' self.codeSystemVersion = None self.listDesignation = [] class CodeSystemVersion: def __init__(self): self.id = 0 self.fullName = '' self.localName = '' self.previousVersionId = 0 self.effectiveDate = '' self.releaseDate = '' self.status = '' self.statusDate = '' self.description = '' self.copyright = '' self.source = '' self.codeSystem = None self.listCodeSystemConcept = [] class Designation: def __init__(self): self.id = 0 self.designation = '' self.languageCode = '' self.type = '' self.isPreffered = False self.status = '' self.statusDate = '' self.codeSystemConcept = None def _getAsXML(self): ss = '\n' return ss class TranscodingAssociation: def __init__(self): self.id = 0 self.targetConcept = None self.sourceConcept = None self.quality = '' self.status = '' self.statusDate ='' class ValueSet: def __init__(self): self.id = 0 self.oid = '' self.epsosName = '' self.description = '' self.parentCodeSystem = None self.listValueSetVersion = [] def _getAsXML(self): ss = '\n' for valsetVersion in self.listValueSetVersion: for csc in valsetVersion.listCodeSystemConcept: for des in csc.listDesignation: ss = ss + des._getAsXML() ss = ss + '\n' return ss class ValueSetVersion: def __init__(self): self.id = 0 self.versionName = '' self.effectiveDate = '' self.releaseDate = '' self.status = '' self.statusDate = '' self.description = '' self.previousVersion = None self.valueSet = None self.listCodeSystemConcept = [] ## # # XMLDAO class used for XML parsing of the original epsos-code.xml file # ## class XMLDAO: def __init__(self): pass def getListCodeSystem(self, epsosFilePath): self.dom = minidom.parse(epsosFilePath) self.root = self._getNode(self.dom,'Codes') result = self.root.getElementsByTagName('CodeSystem') return result def getListValueSetFromCodeSystem(self, codeSystemNode): return codeSystemNode.getElementsByTagName('ValueSet') def _getNode(self, parent_node, name): try: return parent_node.getElementsByTagName(name)[0] except: return None ## # # DBDAO used to access to tsam database and load objects from this db # ## class DBDAO: def __init__(self): pass def connect(self): self.conn = psycopg2.connect('dbname=' + DB_TSAM_NAME + ' user=' + DB_USER) self.cur = self.conn.cursor() def loadListCodeSystem(self): self.connect() self.cur.execute("SELECT DISTINCT * FROM code_system;") x=self.cur.fetchone() res = [] while x: toAdd = CodeSystem() toAdd.id = x[0] toAdd.oid = x[1] toAdd.name = x[2] toAdd.description = x[3] res.append(toAdd) toAdd.listValueSet = self.loadListValueSetForCodeSystem(toAdd) x=self.cur.fetchone() res = sorted(res, key=lambda cs: cs.oid) return res def loadListValueSetForCodeSystem(self, toAdd): res = [] if toAdd.id != 0: conn = psycopg2.connect('dbname=' + DB_TSAM_NAME +' user=' + DB_USER) cur = conn.cursor() cur.execute('SELECT DISTINCT * FROM value_set WHERE parent_code_system_id=' + str(toAdd.id) + ';') x=cur.fetchone() while x: valset = ValueSet() valset.id = x[0] valset.oid = x[1] valset.epsosName = x[2] valset.description = x[3] if valset.oid != None: valset.parentCodeSystem = toAdd res.append(valset) valset.listValueSetVersion = self.loadListValueSetVersionForValueSet(valset) x=cur.fetchone() res = sorted(res, key=lambda cs: cs.oid) return res def loadListValueSetVersionForValueSet(self, valset): res = [] if (valset.id != 0): conn = psycopg2.connect('dbname=' + DB_TSAM_NAME +' user=' + DB_USER) cur = conn.cursor() cur.execute('SELECT DISTINCT * FROM value_set_version WHERE value_set_id=' + str(valset.id) + ' AND value_set_version.status=\'Current\';') x=cur.fetchone() while x: valsetver = ValueSetVersion() valsetver.id = x[0] valsetver.versionName = x[1] valsetver.effectiveDate = x[2] valsetver.status = x[4] valsetver.valueSet = valset valsetver.listCodeSystemConcept = self.loadListCodeSystemConceptForValueSetVersion(valsetver) res.append(valsetver) x=cur.fetchone() return res def loadListCodeSystemConceptForValueSetVersion(self, valsetver): res = [] if (valsetver.id != 0): conn = psycopg2.connect('dbname=' + DB_TSAM_NAME +' user=' + DB_USER) cur = conn.cursor() # cur.execute('SELECT DISTINCT * FROM code_system_concept JOIN code_system_version ON code_system_concept.code_system_version_id=code_system_version.id WHERE code_system_concept.id IN (SELECT code_system_concept_id from x_concept_value_set WHERE x_concept_value_set.value_set_version_id=' + str(valsetver.id) + # ') AND code_system_concept.status=\'Current\'AND code_system_version.status=\'Current\' AND code_system_version.code_system_id=' + # str(valsetver.valueSet.parentCodeSystem.id) +' ;') cur.execute('SELECT DISTINCT * FROM code_system_concept WHERE code_system_concept.id IN (SELECT code_system_concept_id from x_concept_value_set WHERE x_concept_value_set.value_set_version_id=' + str(valsetver.id) + ') AND code_system_concept.status=\'Current\' ;') x=cur.fetchone() while x: csc = CodeSystemConcept() csc.id = x[0] csc.code = x[1] csc.status = x[2] csc.statusDate = x[3] csc.definition = x[4] csc.listDesignation = self.loadListDesignationForCodeSystemConcept(csc) res.append(csc) x=cur.fetchone() return sorted(res, key=lambda csc: csc.code) def loadListDesignationForCodeSystemConcept(self, csc): res = [] if csc.id != 0: conn = psycopg2.connect('dbname=' + DB_TSAM_NAME +' user=' + DB_USER) cur = conn.cursor() cur.execute('SELECT DISTINCT * FROM designation WHERE (code_system_concept_id=' + str(csc.id) + ' and designation.status=\'Current\' and designation.language_code=\'en\') ;') x=cur.fetchone() while x: des = Designation() des.id = x[0] des.designation = x[1] des.languageCode = x[2] des.type = x[3] des.isPreffered = x[4] des.status = x[5] des.statusDate = x[6] des.codeSystemConcept = csc res.append(des) x=cur.fetchone() return res def getListCodeSystemAsFullXML(self, listCodeSystem): ss = '\n' ss = ss + ' ' for cs in listCodeSystem: ss = ss + cs._getAsXML() ss = ss + self._getLanguageStringCodeAsXML() ss = ss + '' dom = minidom.parseString(ss.encode('utf-8')) res = dom.toprettyxml() return res def _getLanguageStringCodeAsXML(self): return ''' ''' def generateListRetrieveValueSet(self, folderpath): res = '' fio = FileIOExtra() news = False listCS = self.loadListCodeSystem() for cs in listCS: for valset in cs.listValueSet: res = '''''' res = res + '' res = res + '' for valsetVersion in valset.listValueSetVersion: for csc in valsetVersion.listCodeSystemConcept: for des in csc.listDesignation: res = res + '' res = res + '' res = self._getAsPrettyXML(res) fio.saveXMLFile(res, folderpath + '/' + valset.oid + '.xml') def _getversion(self): return str(datetime.datetime.today().year) + str(datetime.datetime.today().month) + str( datetime.datetime.today().day) def _getAsPrettyXML(self, ss): dom = minidom.parseString(ss.encode('utf-8')) res = dom.toprettyxml() return res ## # # FileIOExtra Class is used to read and write from a text file # ## class FileIOExtra: def __init__(self): pass def saveXMLFile(self, content, fileName): fich = open(fileName,'w') fich.write(content.encode('utf-8')) fich.close() def getOriginalFile(self, fileName): res = '' if os.path.exists(fileName): fich = open(fileName,'r') res = fich.read() fich.close() return res if __name__ == '__main__': reload(sys) sys.setdefaultencoding("utf-8") db = DBDAO() print '' + str(datetime.datetime.today().year) + str(datetime.datetime.today().month) + str( datetime.datetime.today().day) db.generateListRetrieveValueSet(FOLDER_RESULT)