Skip to content

Commit

Permalink
linter
Browse files Browse the repository at this point in the history
  • Loading branch information
luabida committed Mar 6, 2023
1 parent ab4db1e commit 70b3f36
Show file tree
Hide file tree
Showing 4 changed files with 1,390 additions and 1,354 deletions.
63 changes: 44 additions & 19 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,23 +1,48 @@
repos:
- repo: https://github.com/asottile/seed-isort-config
rev: v2.2.0
hooks:
- id: seed-isort-config
- repo: https://github.com/timothycrosley/isort
rev: 5.9.3
hooks:
- id: isort
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.1.0
hooks:
- id: end-of-file-fixer

- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
- id: black
exclude: ^dist/
- repo: local
hooks:
- entry: black
id: black
name: black
exclude: |
(?x)(
docs
)
files: ""
language: system
pass_filenames: true
stages:
- commit
types:
- python
- file
- python

- entry: flake8
exclude: ^$
files: ""
id: flake8
language: python
name: flake8
pass_filenames: true
stages:
- commit
types:
- python

- repo: https://gitlab.com/pycqa/flake8
rev: 3.9.2
hooks:
- id: flake8
types:
- python
- entry: isort
exclude: "^.*/js/.*$"
files: ""
id: isort
language: python
name: isort
pass_filenames: true
stages:
- commit
types:
- python
125 changes: 70 additions & 55 deletions pysus/classes/sinan/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import os
import re
from ftplib import FTP
from pathlib import Path
from typing import List, Union

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

from ftplib import FTP
from dbfread import DBF
from pathlib import Path
from typing import List, Union
from sqlalchemy import VARCHAR, DATE, NUMERIC, INTEGER
from pysus.utilities.readdbc import dbc2dbf
from sqlalchemy import DATE, INTEGER, NUMERIC, VARCHAR

from .diseases import DISEASE_CODE
from .typecast import COLUMN_TYPE
from pysus.utilities.readdbc import dbc2dbf


class SINAN:
Expand All @@ -26,16 +26,16 @@ class SINAN:

diseases = list(DISEASE_CODE.keys())

def available_years(self, disease: str, stage: str = 'all') -> list:
def available_years(self, disease: str, stage: str = "all") -> list:
return Disease(disease).get_years(stage)

def download_parquets(
disease: str,
years: List[Union[(int, str)]] = None,
data_path: str = '/tmp/pysus',
data_path: str = "/tmp/pysus",
) -> None:
_disease = Disease(disease)
ftp = FTP('ftp.datasus.gov.br')
ftp = FTP("ftp.datasus.gov.br")

if not years:
_years = _disease.get_years()
Expand All @@ -46,34 +46,34 @@ def download_parquets(
Path(data_path).mkdir(parents=True, exist_ok=True)

for path in _paths:
filename = str(path).split('/')[-1]
filename = str(path).split("/")[-1]
filepath = Path(data_path) / filename
parquet_dir = f'{str(filepath)[:-4]}.parquet'
parquet_dir = f"{str(filepath)[:-4]}.parquet"
Path(parquet_dir).mkdir(exist_ok=True, parents=True)
if not any(os.listdir(parquet_dir)):
ftp.login()
ftp.retrbinary(f'RETR {path}', open(filepath, 'wb').write)
ftp.retrbinary(f"RETR {path}", open(filepath, "wb").write)
parquet_dir = _dbc_to_parquet_chunks(str(filepath))
print(f'[INFO] {_disease} at {parquet_dir}')
print(f"[INFO] {_disease} at {parquet_dir}")

def parquets_to_df(
disease: str, year: Union[(str, int)], data_path='/tmp/pysus'
disease: str, year: Union[(str, int)], data_path="/tmp/pysus"
) -> pd.DataFrame:
dis = Disease(disease)
_year = str(year)[-2:].zfill(2)
parquet_dir = Path(data_path) / f'{dis.code}BR{_year}.parquet'
parquet_dir = Path(data_path) / f"{dis.code}BR{_year}.parquet"

if parquet_dir.exists() and any(os.listdir(parquet_dir)):
chunks = parquet_dir.glob('*.parquet')
chunks = parquet_dir.glob("*.parquet")
chunks_df = [
_convert_df_types(
pd.read_parquet(str(f), engine='fastparquet')
pd.read_parquet(str(f), engine="fastparquet")
)
for f in chunks
]
df = pd.concat(chunks_df, ignore_index=True)
objs = df.select_dtypes(object)
df[objs.columns] = objs.apply(lambda x: x.str.replace('\x00', ''))
df[objs.columns] = objs.apply(lambda x: x.str.replace("\x00", ""))
return df
else:
return pd.DataFrame
Expand All @@ -82,15 +82,15 @@ def metadata_df(disease: str) -> pd.DataFrame:
code = DISEASE_CODE[disease]
metadata_file = (
Path(__file__).parent.parent.parent
/ 'metadata'
/ 'SINAN'
/ f'{code}.tar.gz'
/ "metadata"
/ "SINAN"
/ f"{code}.tar.gz"
)
df = pd.read_csv(
metadata_file,
compression='gzip',
compression="gzip",
header=0,
sep=',',
sep=",",
quotechar='"',
error_bad_lines=False,
)
Expand All @@ -108,11 +108,11 @@ def __diseasecheck__(self, name: str) -> str:
return (
name
if name in DISEASE_CODE.keys()
else ValueError(f'{name} not found.')
else ValueError(f"{name} not found.")
)

def __repr__(self) -> str:
return f'SINAN Disease ({self.name})'
return f"SINAN Disease ({self.name})"

def __str__(self) -> str:
return self.name
Expand All @@ -121,25 +121,27 @@ def __str__(self) -> str:
def code(self) -> str:
return DISEASE_CODE[self.name]

def get_years(self, stage: str = 'all') -> list:
def get_years(self, stage: str = "all") -> list:
"""
Returns the available years to download, if no stage
is passed, it will return years from both finals and
preliminaries datasets.
stage (str): 'finais' | 'prelim' | 'all'
"""

extract_years = lambda paths: [
str(path).split('/')[-1].split('.dbc')[0][-2:] for path in paths
]
def extract_years(paths):
return [
str(path).split("/")[-1].split(".dbc")[0][-2:]
for path in paths
]

p = _ftp_list_datasets_paths
prelim_years = extract_years(p(self.name, 'prelim'))
finais_years = extract_years(p(self.name, 'finais'))
prelim_years = extract_years(p(self.name, "prelim"))
finais_years = extract_years(p(self.name, "finais"))

if stage == 'prelim':
if stage == "prelim":
return sorted(prelim_years)
elif stage == 'finais':
elif stage == "finais":
return sorted(finais_years)
return sorted(prelim_years + finais_years)

Expand All @@ -151,11 +153,14 @@ def get_ftp_paths(self, years: list) -> list:
in the result
"""
p = _ftp_list_datasets_paths
prelim_paths = p(self.name, 'prelim')
finais_paths = p(self.name, 'finais')
prelim_paths = p(self.name, "prelim")
finais_paths = p(self.name, "finais")
all_paths = prelim_paths + finais_paths
ds_paths = list()
mask = lambda _year: str(_year)[-2:].zfill(2)

def mask(_year):
return str(_year)[-2:].zfill(2)

for year in years:
[ds_paths.append(path) for path in all_paths if mask(year) in path]

Expand All @@ -166,40 +171,40 @@ def _ftp_list_datasets_paths(disease: str, stage: str) -> list:
"""
stage: 'f'|'finais' or 'p'|'prelim'
"""
datasets_path = '/dissemin/publicos/SINAN/DADOS/'
datasets_path = "/dissemin/publicos/SINAN/DADOS/"

if stage.startswith('f'):
datasets_path += 'FINAIS'
elif stage.startswith('p'):
datasets_path += 'PRELIM'
if stage.startswith("f"):
datasets_path += "FINAIS"
elif stage.startswith("p"):
datasets_path += "PRELIM"
else:
raise ValueError(f'{stage}')
raise ValueError(f"{stage}")

code = DISEASE_CODE[disease]

ftp = FTP('ftp.datasus.gov.br')
ftp = FTP("ftp.datasus.gov.br")
ftp.login()
ftp.cwd(datasets_path)
available_dbcs = ftp.nlst(f'{code}BR*.dbc')
available_dbcs = ftp.nlst(f"{code}BR*.dbc")

return [f'{ftp.pwd()}/{dbc}' for dbc in available_dbcs]
return [f"{ftp.pwd()}/{dbc}" for dbc in available_dbcs]


def _dbc_to_parquet_chunks(dbcfilepath: str) -> str:
"""
Converts .dbc file to parquet chunks, removing the leftover files.
Returns the parquet dir path.
"""
dbffilepath = f'{dbcfilepath[:-4]}.dbf'
parquetpath = f'{dbcfilepath[:-4]}.parquet'
dbffilepath = f"{dbcfilepath[:-4]}.dbf"
parquetpath = f"{dbcfilepath[:-4]}.parquet"

dbc2dbf(dbcfilepath, dbffilepath)
Path(dbcfilepath).unlink()

for d in _stream_DBF(DBF(dbffilepath, encoding='iso-8859-1', raw=True)):
for d in _stream_DBF(DBF(dbffilepath, encoding="iso-8859-1", raw=True)):
try:
df = pd.DataFrame(d).applymap(
lambda x: x.decode(encoding='iso-8859-1')
lambda x: x.decode(encoding="iso-8859-1")
if isinstance(x, bytes)
else x
)
Expand All @@ -214,24 +219,34 @@ def _dbc_to_parquet_chunks(dbcfilepath: str) -> str:


def _convert_df_types(df: pd.DataFrame) -> pd.DataFrame:
"""Converts each column to its properly data types, if unable to cast, keep it as object"""
"""
Converts each column to its properly data types,
if unable to cast, keep it as object
"""
for column in df.columns:
if column in COLUMN_TYPE.keys():
try:
remove_non_utf8 = lambda x: str(x).encode('utf-8', 'surrogatepass').decode('utf-8')
remove_non_utf8 = (
lambda x: str(x)
.encode("utf-8", "surrogatepass")
.decode("utf-8")
)
df[column] = df[column].apply(remove_non_utf8)
sql_type = COLUMN_TYPE[column]
if sql_type is VARCHAR:
df = df.astype(dtype={column: 'string'})
df = df.astype(dtype={column: "string"})
elif sql_type is NUMERIC or INTEGER:
non_numeric = re.compile(r'[^0-9]')
subst_non_numerics = lambda x: re.sub(non_numeric, '', str(x))
non_numeric = re.compile(r"[^0-9]")

def subst_non_numerics(string):
return re.sub(non_numeric, "", str(string))

df[column] = df[column].apply(subst_non_numerics)
df[column] = pd.to_numeric(df[column])
elif sql_type is DATE:
df[column] = pd.to_datetime(df[column])
except Exception:
df = df.astype(dtype={column: 'object'})
df = df.astype(dtype={column: "object"})

return df

Expand Down
Loading

0 comments on commit 70b3f36

Please sign in to comment.