# -*- coding: utf-8 -*-
"""
Created on Mon Oct 21 12:24:04 2024
@author: xhan
"""
# ODF2pd
import pandas as pd
import zipfile
import xml.etree.ElementTree as ET
import os
[docs]
def read_odf(path, languages = "all", usecols = None, skiprows=None, nrows=None, na_values = None):
"""
Read an Open Data Format (ODF) file into a Pandas DataFrame.
This function reads data from an ODF zipfile (containing data.csv and metadata.xml) and converts it into a pandas DataFrame.
It supports language selection, optional filtering of columns, skipping rows, and replacing specific values
with NaN.
Parameters
----------
path : str
The file path to the ODF file to be read.
languages : str or list of str, default "all"
Specifies the language(s) to extract from the file. Use "all" to read all available languages, or pass a single language code (e.g., "en").
usecols : list of int or str, optional
Specifies the columns to be read from the file. If None, all columns are read. Column selection can be by index or name.
skiprows : int or list of int, optional
Line numbers to skip (0-indexed) at the start of the file. Can be used to skip metadata or headers.
nrows : int, optional
The number of rows to read from the file. If None, all rows are read.
na_values : scalar, str, list-like, or dict, optional
Additional values to consider as NaN. If dict, applies per column.
Returns
-------
DataFrame
A pandas DataFrame containing the data and metadata from the ODF file.
Notes
-----
- The `languages` parameter allows for selecting specific localized data if the ODF file supports it.
- Metadata is stored in the `attrs` attribute of pandas objects- You can call the attributes with df.attrs or df['variable_name'].attrs.
Examples
--------
Read an ODF file and load all columns:
>>> import opendataformat as odf
>>> df = odf.read_odf("example_dataset.zip")
Read an ODF zipfile, selecting specific language:
>>> df = odf.read_odf("example.zip", languages="en")
"""
# if path has not suffix .zip" but a ".zip" file exists, .zip" is added to path
# if no file zipped file exists, but a folder with the name exists, the function tries to read
if (not path.endswith(".zip") and not os.path.exists(path)) or (not path.endswith(".zip") and os.path.exists(path + ".zip")) :
path = path + ".zip"
if not os.path.exists(path):
raise FileNotFoundError(f"The file {path} was not found.")
if not path.endswith(".zip") and (not os.path.exists(path + "/data.csv") or not os.path.exists(path + "/metadata.xml")):
raise FileNotFoundError(f"A file {path + '.zip'} was not found and in the folder {path} expected metadata.xml and data.csv.")
if '.zip' not in path:
file_not_zipped = True
else:
file_not_zipped = False
if file_not_zipped == False:
# Open zip data and xml file in it
with zipfile.ZipFile(path, 'r') as zip_ref:
if 'data.csv' not in zip_ref.namelist():
raise Exception(f"Expected data.csv in {path}")
if 'metadata.xml' not in zip_ref.namelist():
raise Exception(f"Expected metadata.xml in {path}")
try:
root=ET.fromstring(zip_ref.read('metadata.xml'))
except Exception as e:
raise Exception(f"{type(e).__name__} in reading metadata.xml in {path}. Check the xml file in the data file")
# Iterate through the tags in xml and remove prefix of each tag
for i in root.iter():
i.tag=i.tag.split('}')[-1]
#print(i.tag)
#read the csv file
with zip_ref.open('data.csv') as csv_file:
if (skiprows != None):
if (type(skiprows) == int):
skiprows = list(range(skiprows))
skiprows = [x + 1 for x in skiprows]
df = pd.read_csv(csv_file, encoding='UTF-8', usecols = usecols, skiprows=skiprows, nrows=nrows, na_values = na_values)
if usecols != None and all(isinstance(item, str) for item in usecols):
df = df[usecols]
# Make dataset dictionary
dataset_dic=make_dataset_dic(root)
# Make variables dictionary
variables_dic=make_variables_dic(root, variables = list(df.columns))
#remove languages that are not needed
if languages != "all":
if type(languages) == str:
languages = [languages]
keys_wrong_language = []
for key in dataset_dic.keys():
if 'label_' in key or 'description_' in key:
if key.split("_")[1] not in languages:
keys_wrong_language.append(key)
for key in keys_wrong_language:
dataset_dic.pop(key)
for varname in variables_dic.keys():
var_dic = variables_dic[varname]
keys_wrong_language = []
for key in var_dic.keys():
if 'label_' in key or 'labels_' in key or 'description_' in key:
if key.split("_")[1] not in languages:
keys_wrong_language.append(key)
for key in keys_wrong_language:
var_dic.pop(key)
variables_dic[varname] = var_dic
df.attrs=dataset_dic
for var_name, attributes in variables_dic.items():
if var_name in df.columns:
df[var_name].attrs=attributes
elif file_not_zipped == True:
metadata_path = os.path.join(path, 'metadata.xml')
data_csv_path = os.path.join(path, 'data.csv')
# Ensure files exist
if not os.path.exists(metadata_path) or not os.path.exists(data_csv_path):
raise ValueError("Expected metadata.xml and data.csv in {path}")
# Parse metadata.xml directly
try:
tree = ET.parse(metadata_path)
except Exception as e:
raise Exception(f"{type(e).__name__} in reading metadata.xml in {path}. Check the xml file in the data file")
root = tree.getroot()
# Process XML tags
for i in root.iter():
i.tag = i.tag.split('}')[-1]
# Load data.csv from folder and save dictionaries to DataFrame
if (skiprows != None):
if (type(skiprows) == int):
skiprows = list(range(skiprows))
skiprows = [x + 1 for x in skiprows]
df = pd.read_csv(data_csv_path, encoding='UTF-8', usecols = usecols, skiprows=skiprows, nrows=nrows, na_values = na_values)
if usecols != None and all(isinstance(item, str) for item in usecols):
df = df[usecols]
# Make dataset dictionary
dataset_dic=make_dataset_dic(root)
# Make variables dictionary
variables_dic=make_variables_dic(root, variables = list(df.columns))
#remove languages that are not needed
if languages != "all":
if type(languages) == str:
languages = [languages]
keys_wrong_language = []
for key in dataset_dic.keys():
if 'label_' in key or 'description_' in key:
if key.split("_")[1] not in languages:
keys_wrong_language.append(key)
for key in keys_wrong_language:
dataset_dic.pop(key)
for varname in variables_dic.keys():
var_dic = variables_dic[varname]
keys_wrong_language = []
for key in var_dic.keys():
if 'label_' in key or 'labels_' in key or 'description_' in key:
if key.split("_")[1] not in languages:
keys_wrong_language.append(key)
for key in keys_wrong_language:
var_dic.pop(key)
variables_dic[varname] = var_dic
df.attrs = dataset_dic
for var_name, attributes in variables_dic.items():
if var_name in df.columns:
df[var_name].attrs=attributes
return df
def make_dataset_dic(root):
dictionary = {} # Initialize the dictionary to store label entries
#study and dataset name
dictionary['study'] = root.findtext(".//stdyDscr/citation/titlStmt/titl")
dictionary['dataset'] = root.findtext(".//fileDscr/fileTxt/fileName")
# labels
titl_stmt = root.find('.//fileDscr/fileTxt/fileCitation/titlStmt')
if titl_stmt is not None:
# Loop through each child element in titlStmt
for elem in titl_stmt:
# Check if there is a language attribute
lang = elem.get('{http://www.w3.org/XML/1998/namespace}lang')
if lang:
# Store text with 'label_<language>' key if lang attribute exists
dictionary[f'label_{lang}'] = elem.text
else:
# Store text with 'label' key if no lang attribute exists
dictionary['label'] = elem.text
# Process descriptions in fileCont
for file_cont in root.findall('.//fileDscr/fileTxt/fileCont'):
lang = file_cont.get('{http://www.w3.org/XML/1998/namespace}lang')
if lang:
dictionary[f'description_{lang}'] = file_cont.text
else:
dictionary['description'] = file_cont.text
#URL
ExtLink = root.findall('.//fileDscr/notes/ExtLink')
if len(ExtLink) == 1:
dictionary['url'] = ExtLink[0].get('URI')
return dictionary
def make_variables_dic(root, variables):
dictionaries={}
for var in root.findall('.//dataDscr/var'):
varname = var.attrib.get('name')
if varname not in variables:
continue
# dictionary
dictionary = {}
# variable
dictionary['variable'] = var.attrib.get('name')
# Process `labl` elements
for labl_elem in var.findall('labl'):
# Check if there is a language attribute
lang = labl_elem.get('{http://www.w3.org/XML/1998/namespace}lang')
if lang:
# Store text with 'label_<language>' key if lang attribute exists
dictionary[f'label_{lang}'] = labl_elem.text
else:
# Store text with 'label' key if no lang attribute exists
dictionary['label'] = labl_elem.text
# Process `txt` elements
for txt_elem in var.findall('txt'):
# Check if there is a language attribute
lang = txt_elem.get('{http://www.w3.org/XML/1998/namespace}lang')
if lang:
# Store text with 'description_<language>' key if lang attribute exists
dictionary[f'description_{lang}'] = txt_elem.text
else:
# Store text with 'description' key if no lang attribute exists
dictionary['description'] = txt_elem.text
# Process `catgry` elements to accumulate labels by language
for catgry_elem in var.findall('catgry'):
# Get the category value
catValu_elem = catgry_elem.find('catValu')
if catValu_elem is not None:
cat_value = catValu_elem.text
else:
continue # Skip if there's no category value
# Loop through `labl` elements within `catgry`
for labl_elem in catgry_elem.findall('labl'):
lang = labl_elem.get('{http://www.w3.org/XML/1998/namespace}lang', 'default')
# Construct the key for labels by language (e.g., 'labels_en', 'labels_de')
labels_key = f'labels_{lang}' if lang != 'default' else 'labels'
# Initialize the dictionary for this language if not already present
if labels_key not in dictionary:
dictionary[labels_key] = {}
# Add the category value and corresponding label to the dictionary for the language
dictionary[labels_key][cat_value] = labl_elem.text
# Process `varFormat` for type
varFormat_elem = var.find('varFormat')
if varFormat_elem is not None:
dictionary['type'] = varFormat_elem.get('type')
# Process `ExtLink` for external URL
extLink_elem = var.find('.//notes/ExtLink')
if extLink_elem is not None:
dictionary['url'] = extLink_elem.get('URI')
dictionaries[varname] = dictionary
return dictionaries