#!/usr/bin/env python # -*- coding: utf-8 -*- from lxml import etree from xml.sax.expatreader import AttributesImpl import sys, urllib, re,copy, traceback from astropy.io.votable import parse, parse_single_table from globals_mapper import Globals_Mapper from collections import OrderedDict from util_classes import * class Mapper: # # global resources # path_separator = re.compile("[\.:#]+") def __init__(self, name, filename, master_table_id=None, root_role=None): self.name = name self.filename = filename self.xml_tree = None self.master_table_id = master_table_id self.root_role = root_role self.primary_key = None self.foreign_key = None self.master_template_node = None self.value_map = OrderedDict() self.foreign_table_id = None; self.foreign_mappers = {} self.globals_mapper = None if root_role != None and master_table_id == None: self.log_error("Init: a table ID must be specified when a root_role ( " + root_role + ")is given") elif root_role == None and master_table_id != None: pass # self.print_message("Init: a root role must be specified when a table ID ( " + self.master_table_id + ") is given") # sys.exit(1) elif root_role != None and master_table_id != None: self.print_message("Init mapper on INSTANCE[@dmrole='" + root_role + "'] in table " + self.master_table_id ) else: self.root_role = "root" self.print_message("Init mapper on the first INSTANCE[@dmrole='root'] found") def __str__(self): retour = "Mapper " + self.name + " on table '" + self.master_table_id + "' of " + self.filename return retour def __repr__(self): retour = self.__str__() + "\n" retour += "Primary key\n " if self.primary_key != None: retour += self.primary_key.__str__() + "\n" else: retour += "None\n" retour += "Foreign key\n " if self.foreign_key != None: retour += self.foreign_key.__str__() + "\n" else: retour += "None\n" retour += "Native keys\n" for key, value in self.value_map.items(): if key.startswith('#'): retour += " " + key + " : " + value.__str__() + "\n" retour += "Shortcuts\n" for key, value in self.value_map.items(): if key.startswith('#') == False: retour += " " + key + " : " + value.__str__() + "\n" return retour; def log_error(self, msg): self.print_message(msg) traceback.print_stack() sys.exit(1) def print_message(self, message): print("Mapper " + self.name + " " + message) def read_annotation(self): self.print_message( "Reading FILE " + self.filename) # # init pareser # if self.filename.startswith("http://") or self.filename.startswith("https://") : self.xml_tree = etree.ElementTree(file=urllib.request.urlopen(self.filename)) else : self.xml_tree = etree.parse(self.filename) self.locate_resources() self.parse_keys() if self.root_role == None: for node in self.master_template_node.xpath("./INSTANCE"): role = node.get("dmrole") self.root_class_node = node self.print_message("Root class with dmrole=" + role + " found") self.parse_tuple(self.root_class_node, "#" + role) elif self.root_role == "root": self.parse_tuple(self.root_class_node, "") else : self.parse_tuple(self.root_class_node, "#" + self.root_role) self.resolve_references() self.resolve_filter_references() self.add_shorcuts() def locate_resources(self): self.vodml_node = self.xml_tree.find("//VODML") if self.vodml_node == None: self.print_message("Cannot find VODML block") sys.exit(1) self.print_message("VODML block located looking for role " + str(self.root_role)) if self.root_role == "root": base = self.xml_tree.find("//TEMPLATES/INSTANCE[@dmrole='root']") if base != None: self.master_table_id = base.find("...").get("tableref") self.print_message("INSTANCE[@dmrole='root'] located in TEMPLATES[@tableref='" + self.master_table_id + "']") self.root_class_node = base else: self.log_error("//TEMPLATES/INSTANCE[@dmrole='root'] not found") elif self.root_role == None: self.print_message("All INSTANCEs located in TEMPLATES[@tableref='" + self.master_table_id + "'] will be parsed") else: base = self.xml_tree.find("//TEMPLATES[@tableref='" + self.master_table_id + "']/INSTANCE[@dmrole='" + self.root_role + "']") if base != None: self.print_message("INSTANCE[@dmrole='" + self.root_role + "'] located in TEMPLATES[@tableref='" + self.master_table_id + "']") self.root_class_node = base else: base = self.xml_tree.find("//TEMPLATES[@tableref='" + self.master_table_id + "']/COLLECTION[@dmrole='" + self.root_role + "']") if base != None: self.print_message("COLLECTION[@dmrole='" + self.root_role + "'] located in TEMPLATES[@tableref='" + self.master_table_id + "']") self.root_class_node = base else: self.log_error("COLLECTION or INSTANCE[@dmrole='" + self.root_role + "'] not found in TEMPLATES[@tableref='" + self.master_table_id + "']") if self.xml_tree.find("//TABLE[@ID='" + self.master_table_id + "']") == None: self.log_error("Cannot find TABLE[@ID='" + self.master_table_id + "']") self.print_message("TABLE[@ID='" + self.master_table_id + "'] located") self.master_template_node = self.xml_tree.find("//TEMPLATES[@tableref='" + self.master_table_id + "']") if self.master_template_node == None: self.log_error("Cannot find TEMPLATES[@tableref'=" + self.master_table_id + "']") self.print_message("TEMPLATES[@reftable'" + self.master_table_id + "'] located") def parse_keys(self): pkn = self.vodml_node.find("./TEMPLATES[@tableref='" + self.master_table_id + "']/PRIMARYKEY") if( pkn != None ): pk = pkn.get("source") self.primary_key = self.build_value_descriptor(pk) self. print_message(pk + " found as primary key in table " + self.master_table_id ) skn = self.vodml_node.find("./TEMPLATES/FOREIGNKEY[@source='" + pk + "']") if( skn != None ): self.foreign_key = self.build_value_descriptor(skn.get("source")) foreign_table_id = skn.find("...").get("tableref") self.foreign_key.table_id = foreign_table_id self.foreign_mappers[foreign_table_id] = Mapper("Secondary", self.filename, foreign_table_id) self.foreign_mappers[foreign_table_id].read_annotation() self. print_message(skn.get("source") + " found as foreign key in table " + foreign_table_id ) ''' pkn = self.vodml_node.find("./TEMPLATES[@tableref='" + self.master_table_id + "']/FOREIGNKEY") if( pkn != None ): pk = pkn.get("source") self.foreign_key = self.build_value_descriptor(pk) self.print_message(pk + " found as foreign key") ''' def parse_tuple(self, node, key, multiplicity=None): role = node.get("dmrole") if role == None: self.log_error("tuple without dmrole found") # avoids duplicated path elements if key != ("#" + role): key = key + "#" + role # Ignore root role which is just a marker without model semantics if key == "#root": key = "" ref = node.get("tableref") """ The tuple is a reference, it has not fields. Its content must be searched either in GLOBALS or in the foreign table """ if ref != None: if ref == self.foreign_table_id: #if ref != None: """ We look first for that ref is the foreign tables """ dmrole = node.get("dmrole") if dmrole== None : self.log_error("Tuple reference in joined table " + ref + " has no dmrole") if ref not in self.foreign_mappers.keys(): self.foreign_mappers[ref] = Mapper("Foreign", self.filename, master_table_id=ref, root_role=role) self.foreign_mappers[ref].read_annotation() sdmrole = "#" + dmrole coll_node = node.find("...") coll_role = key + "#" + coll_node.get("dmrole") coll_role = self.path_separator.split(coll_role)[-1] for fk, fv in self.foreign_mappers[ref].value_map.items(): if fk.startswith(sdmrole): v = copy.deepcopy(fv) v.multiplicity = coll_role v.table_id = self.foreign_table_id self.value_map[fk] = v elif self.xml_tree.find("//TABLE[@ID='" + ref + "']") != None: """ We look then for that ref in another table as a tuple """ if ref not in self.foreign_mappers.keys(): ''' Create a mapper parsing all tuples of the referenced table ''' self.foreign_mappers[ref] = Mapper("Foreign", self.filename, master_table_id=ref) self.foreign_mappers[ref].read_annotation() sdmrole = "#" + role multiplicity = None parent = node.find("...") if parent.tag == "COLLECTION": multiplicity = "#" + parent.get("dmrole") + key for fk, fv in self.foreign_mappers[ref].value_map.items(): if fk.startswith(sdmrole): # key can be incremented if duplcitade roles if multiplicity != None: multiplicity ="#" + parent.get("dmrole") + "#" + fk.split("#")[1] v = copy.deepcopy(fv) v.multiplicity = multiplicity v.table_id = ref self.value_map[fk] = v else: """ Nothing in the foreign table, lets' have a look at the globals """ self.print_message ("Looks for " + key +" in GLOBALS ") if( self.globals_mapper == None ): self.globals_mapper = Globals_Mapper(self.filename) self.globals_mapper.read_annotation() if key in self.globals_mapper.root_tuples: vm = self.globals_mapper.root_tuples[key] for rtk , rtvalue in vm.items(): self.value_map[rtk] = rtvalue else: self.log_error("Ref '" + ref + "' cannot be solved neither in GLOBALS nor by join") else : """ Otherwise let's parse the Tuple content """ parent = node.find("...") multiplicity = None if parent.tag == "COLLECTION": multiplicity = "*" for child in node.xpath("*"): if( child.tag == 'VALUE'): self.parse_value(child, key, multiplicity) elif( child.tag == 'INSTANCE'): self.parse_tuple(child, key,multiplicity) elif( child.tag == 'COLLECTION',multiplicity): self.parse_collection(child, key) def parse_value(self, node, key, multiplicity=None): data_source = node.get("ref") val = None if data_source != None: val = "@" + data_source else: data_source = node.get("value") if data_source != None: val = "data_source" else: self.log_error(key + " Value without neither @ref nor @value found") value = self.build_value_descriptor(val, multiplicity) ckey = key + "#" + node.get("dmrole") self.parse_filter(node, value) tk = ckey atk = ckey.split('#') vtk = atk[1] num = 1 while tk in self.value_map.keys(): atk[1] = vtk + str(num) tk = "#".join(atk) num += 1 self.value_map[tk ] = value def parse_collection(self, node, key, multiplicity=None): ref = node.get("tableref") if ref != None: dmrole = node.get("dmrole") if dmrole== None : self.log_error("Collection reference in joined table " + ref + " has no dmrole") if ref not in self.foreign_mappers.keys(): self.foreign_mappers[ref] = Mapper("Foreign", self.filename, master_table_id=ref, root_role=dmrole) self.foreign_mappers[ref].read_annotation() sdmrole = "#" + dmrole coll_node = node coll_role = key + "#" + coll_node.get("dmrole") #coll_role = self.path_separator.split(coll_role)[-1] for fk, fv in self.foreign_mappers[ref].value_map.items(): if fk.startswith(sdmrole): v = copy.deepcopy(fv) v.multiplicity = coll_role v.table_id = ref #self.foreign_table_id self.value_map[fk] = v else: for child in node.xpath("*"): if( child.tag == 'VALUE'): self.parse_value(child, key) elif( child.tag == 'INSTANCE'): self.parse_tuple(child, key) elif( child.tag == 'COLLECTION'): self.parse_collection(child, key) def parse_filter(self , node, value): filter = node.get("filter") if filter != None: sfilter = filter.split("=") if len(sfilter) : value.set_filter(sfilter[0], sfilter[1]) #value.filter = {"param": sfilter[0], "operand": sfilter[1],"field": {"name": None, "index": None}} else: self.log_error("Filter " + filter + " not understood") def build_value_descriptor(self, name, multiplicity=None): if name == None: traceback.print_exc() retour = ValueDescripor(name, self.master_table_id) retour.multiplicity = multiplicity return retour #{"value": name, "table": self.master_table_id, "multiplicity": None, "field": {"name": None, "index": None}, "filter": None} def resolve_references(self): for key, value in self.value_map.items(): val = value.value_ref if( val.startswith("@")): self.resolve_reference(value) if self.primary_key != None : val = self.primary_key.value_ref if( val.startswith("@")): self.resolve_reference(self.primary_key) if self.foreign_key != None : val = self.foreign_key.value_ref if( val.startswith("@")): self.resolve_reference(self.foreign_key) def resolve_reference(self, value): val = value.value_ref table = value.table_id ref = val[1:] pos = 1; found = False for node in self.vodml_node.xpath("//TABLE[@ID='" + table + "']//FIELD"): if( node.get("ID") == ref ): value.set_field_descriptor(node.get("ID"), pos) found = True elif( node.get("name") == ref ): value.set_field_descriptor(node.get("name"), pos) found = True pos += 1 if found == False : for node in self.vodml_node.xpath("//TABLE[@ID='" + table + "']//PARAM"): ''' Params values are take as constants: no filed index ''' if( node.get("ID") == ref ): value.set_field_descriptor(node.get("ID"), None) value.value_ref = node.get("value") found = True elif( node.get("name") == ref ): value.set_field_descriptor(node.get("name"), None) value.value_ref = node.get("value") found = True if found == False : self.log_error("Reference " + str(value) + " cannot be solved") def resolve_filter_references(self): for key, value in self.value_map.items(): filter_block = value.filter if filter_block != None: val = filter_block.param if( val.startswith("@")): ref = val[1:] pos = 1; found = False for node in self.vodml_node.xpath("//FIELD"): if( node.get("ID") == ref ): value.set_filter(ref,filter_block.operand ) filter_block.set_field_descriptor(node.get("name"), pos) found = True pos += 1 if found == False : for node in self.vodml_node.xpath("//PARAM"): if( node.get("ID") == ref ): filter_block.set_field_descriptor(node.get("name"), node.get("value")) found = True if found == False : self.log_error("Filter reference " + val + " cannot be solved") value.filter = filter_block def add_shorcuts(self): filtered_dict = {k:v for (k,v) in self.value_map.items() if k.startswith("#")} sc = ShortCut(filtered_dict.keys()) scm = sc.get_shortcuts() for shortcut,key in scm.items(): self.value_map[shortcut] = self.value_map[key] #if shortcut.startswith("."): # sys.exit() def read_row(self, numpy_row): self.row = numpy_row def get_field_value_from_key(self, field_ref): val = self.value_map[field_ref] if val.field_descriptor != None: return self.get_field_value_from_descriptor(val) else: return val.value_ref def get_field_value_from_descriptor(self, val): if val.value_ref != None and val.value_ref.startswith("@"): pos = val.field_descriptor.index filter = val.filter #If the table has no data (just params) there no row attribute if not hasattr(self, 'row'): return val.value_ref cell = self.row[pos - 1] if filter != None: filter_pos = filter.field_descriptor.index operand = filter.operand.strip('"') operand = operand.strip("'") if self.row[filter_pos - 1].decode("utf-8") == operand: return cell else: return None if cell.__class__.__name__ == "MaskedArray": return list(cell.data) else: return cell return val.value_ref def get_data(self): votable = parse(self.filename) return votable.get_table_by_id(self.master_table_id).array