add fods support

This commit is contained in:
iuvbio 2021-08-20 19:13:40 +02:00
parent a31ac7671b
commit db5c14409f
3 changed files with 109 additions and 45 deletions

View File

@ -1,28 +1,14 @@
"""Imports an ods file into a DataFrame object"""
import ezodf
"""Imports an ods or fods file into a DataFrame object"""
from pathlib import Path
from .parsers import ods
from .tools import sanitize_df
from .parsers import fods, ods
EXT_MAP = {".ods": ods, ".fods": fods}
def read_ods(file_or_path, sheet=1, headers=True, columns=None):
"""
This function reads in the provided ods file and converts it to a
dictionary. The dictionary is converted to a DataFrame. Trailing empty rows
and columns are dropped from the DataFrame, before it is returned.
:param file_or_path: str
the path to the ODS file
:param sheet: int or str, default 1
if int, the 1 based index of the sheet to be read in. If str, the name of
the sheet to be read in
:param header: bool, default True
if True, the first row is read in as headers
:param columns: list, default None
a list of column names to be used as headers
:returns: pandas.DataFrame
the ODS file as a pandas DataFrame
"""
doc = ezodf.opendoc(file_or_path)
df = ods.load_ods(doc, sheet, headers, columns)
return sanitize_df(df)
loader = EXT_MAP.get(Path(file_or_path).suffix)
if not loader:
raise ValueError("Unknown filetype.")
return loader.read(file_or_path, sheet, headers=headers, columns=columns)

View File

@ -1,15 +1,21 @@
from collections import OrderedDict
from collections import defaultdict
from lxml import etree
import pandas as pd
from ..tools import sanitize_df
BODY_TAG = "office:body"
SPREADSHEET_TAG = "office:spreadsheet"
OFFICE_KEY = "office"
TABLE_KEY = "table"
TABLE_TAG = "table:table"
TABLE_ROW_TAG = "table:table-row"
TABLE_CELL_TAG = "table:table-cell"
TABLE_CELL_TEXT_TAG = "text:p"
TABLE_CELL_REPEATED_ATTRIB = "number-columns-repeated"
VALUE_TYPE_ATTRIB = "value-type"
def get_sheet(spreadsheet, sheet_id):
@ -19,7 +25,7 @@ def get_sheet(spreadsheet, sheet_id):
f"{TABLE_TAG}[@table:name='{sheet_id}']", namespaces=namespaces
)
if sheet is None:
raise KeyError(f"There is no sheet named {sheet_id}")
raise KeyError(f"There is no sheet named {sheet_id}.")
return sheet
tables = spreadsheet.findall(TABLE_TAG, namespaces=namespaces)
if sheet_id == 0 or sheet_id > len(tables):
@ -27,6 +33,51 @@ def get_sheet(spreadsheet, sheet_id):
return tables[sheet_id - 1]
def parse_columns(cells, headers=True, columns=None):
orig_columns = cells.pop(0) if headers else None
if columns is None:
if orig_columns:
repeated_val = None
columns = []
repeated_dict = defaultdict(lambda: 0)
for i, col in enumerate(orig_columns):
text = col.find(TABLE_CELL_TEXT_TAG, namespaces=col.nsmap)
if text is not None:
value = text.text
elif text is None and repeated_val:
value = repeated_val
else:
value = "unnamed"
idx = 1
while "{}.{}".format(value, idx) in columns:
idx += 1
value = f"{value}.{idx}"
repeated = col.attrib.get(
f"{{{col.nsmap[TABLE_KEY]}}}{TABLE_CELL_REPEATED_ATTRIB}"
)
if repeated:
repeated_dict[value] += 1
repeated_val = f"{value}.{repeated_dict[value]}"
column = value if value not in columns else f"{value}.{i}"
columns.append(column)
else:
columns = [f"column.{i}" for i in range(len(cells[0]))]
return columns, cells
def parse_value(cell):
text = cell.find(TABLE_CELL_TEXT_TAG, namespaces=cell.nsmap)
is_float = (
cell.attrib.get(f"{{{cell.nsmap[OFFICE_KEY]}}}{VALUE_TYPE_ATTRIB}") == "float"
)
if text is None:
return None
value = text.text
if is_float:
return float(value)
return value
def load_fods(doc, sheet_id, headers=True, columns=None):
if not isinstance(sheet_id, (str, int)):
raise ValueError("Sheet id has to be either `str` or `int`")
@ -37,27 +88,28 @@ def load_fods(doc, sheet_id, headers=True, columns=None):
)
sheet = get_sheet(spreadsheet, sheet_id)
rows = sheet.findall(TABLE_ROW_TAG, namespaces=namespaces)
data = []
allcells = []
for row in rows:
cells = row.findall(TABLE_CELL_TAG, namespaces=namespaces)
data.append(
[
cell.find(TABLE_CELL_TEXT_TAG, namespaces=namespaces).text
for cell in cells
]
)
orig_columns = data.pop(0) if headers else None
if columns is None:
if orig_columns:
columns = orig_columns
else:
columns = [f"column.{i}" for i in range(len(data[0]))]
return pd.DataFrame(
OrderedDict({column: datarow for column, datarow in zip(columns, data)})
)
allcells.append(cells)
columns, values = parse_columns(allcells, headers, columns)
data = []
for row in values:
rowvalues = [parse_value(cell) for cell in row]
data.append(rowvalues)
final_rows = []
for row in data:
final_row = []
for i in range(len(columns)):
if i < len(row):
final_row.append(row[i])
else:
final_row.append(None)
final_rows.append(final_row)
return pd.DataFrame(final_rows, columns=columns)
def read_fods(file_or_path, sheet=1, headers=True, columns=None):
doc = etree.parse(file_or_path)
def read(file_or_path, sheet=1, headers=True, columns=None):
doc = etree.parse(str(file_or_path))
df = load_fods(doc, sheet, headers=headers, columns=columns)
return df
return sanitize_df(df)

View File

@ -1,7 +1,10 @@
from collections import OrderedDict
import ezodf
import pandas as pd
from ..tools import sanitize_df
def load_ods(doc, sheet_id, headers=True, columns=None):
# convert the sheet to a pandas.DataFrame
@ -51,3 +54,26 @@ def load_ods(doc, sheet_id, headers=True, columns=None):
continue
df = pd.DataFrame(df_dict)
return df
def read(file_or_path, sheet=1, headers=True, columns=None):
"""
This function reads in the provided ods file and converts it to a
dictionary. The dictionary is converted to a DataFrame. Trailing empty rows
and columns are dropped from the DataFrame, before it is returned.
:param file_or_path: str
the path to the ODS file
:param sheet: int or str, default 1
if int, the 1 based index of the sheet to be read in. If str, the name of
the sheet to be read in
:param header: bool, default True
if True, the first row is read in as headers
:param columns: list, default None
a list of column names to be used as headers
:returns: pandas.DataFrame
the ODS file as a pandas DataFrame
"""
doc = ezodf.opendoc(file_or_path)
df = load_ods(doc, sheet, headers, columns)
return sanitize_df(df)