Source code for pyspainmobility.zones.zones

from pyspainmobility.utils import utils
import pandas as pd
import geopandas as gpd
import os
import matplotlib
from os.path import expanduser

[docs] class Zones:
[docs] def __init__(self, zones: str = 'municipalities', version: int = 2, output_directory: str = None): """ Class to handle the zoning related to the Spanish big mobility data. The class is used to download the data and process it. Selectable granularities are districts (distritos), municipalities (municipios) and large urban areas (grandes áreas urbanas). As a reminder, mobility data for the COVID-19 period (version 1) are not available for the large urban areas. Parameters ---------- zones : str The zones to download the data for. Default is municipalities. Zones must be one of the following: districts, dist, distr, distritos, municipalities, muni, municipal, municipios, lua, large_urban_areas, gau, gaus, grandes_areas_urbanas version : int The version of the data to download. Default is 2. Version must be 1 or 2. Version 1 contains the data from 2020 to 2021. Version 2 contains the data from 2022 onwards. output_directory : str The directory to save the raw data and the processed parquet. Default is None. If not specified, the data will be saved in a folder named 'data' in user's home directory. Examples -------- >>> from pyspainmobility import Zones >>> # instantiate the object >>> zones = Zones(zones='municipalities', version=2, output_directory='data') >>> # get the geodataframe with the zones >>> gdf = zones.get_zone_geodataframe() >>> print(gdf.head()) name population ID 01001 Alegría-Dulantzi 2925.0 01002 Amurrio 10307.0 01004_AM Artziniega agregacion de municipios 3005.0 01009_AM Asparrena agregacion de municipios 4599.0 """ utils.version_assert(version) utils.zone_assert(zones, version) self.version = version self.zones = utils.zone_normalization(zones) self.complete_df = None self._zoning_links = None self._downloads_ready = False # Get the data directory data_directory = utils.get_data_directory() self.data_directory = data_directory # Proper output directory handling if output_directory is not None: if os.path.isabs(output_directory): # Preserve absolute paths even if parent directories do not exist yet. self.output_path = output_directory else: # Treat relative paths as relative to home directory. home = expanduser("~") clean_path = output_directory.lstrip("/\\") self.output_path = os.path.join(home, clean_path) else: self.output_path = data_directory # Ensure directory exists try: os.makedirs(self.output_path, exist_ok=True) except PermissionError as e: raise PermissionError(f"Cannot create directory {self.output_path}. Please check permissions or use a different path. Error: {e}") except Exception as e: raise Exception(f"Error creating directory {self.output_path}: {e}")
def _get_zoning_links(self) -> list: """ Resolve available zoning links lazily. """ if self._zoning_links is None: self._zoning_links = ( utils.available_zoning_data(self.version, self.zones)["link"] .dropna() .unique() .tolist() ) return self._zoning_links def _ensure_zoning_files_downloaded(self) -> None: """ Download required files only when the user first requests data. """ if self._downloads_ready: return links = self._get_zoning_links() for link in links: file_name = link.split("/")[-1] local_path = os.path.join(self.output_path, file_name) if not os.path.exists(local_path): print("Downloading necessary files....") utils.download_file_if_not_existing(link, local_path) if self.version == 1 and file_name.endswith(".zip"): utils.unzip_file(local_path, self.output_path) self._downloads_ready = True def _load_zone_geodataframe(self) -> None: """ Build/load the zone geodataframe lazily on first access. """ if self.complete_df is not None: return self._ensure_zoning_files_downloaded() print("Zones already downloaded. Reading the files....") output_file_path = os.path.join(self.output_path, f"{self.zones}_{self.version}.geojson") if os.path.exists(output_file_path): print(f"File {output_file_path} already exists. Loading it...") self.complete_df = gpd.read_file(output_file_path) return if self.version == 2: def _read_pipe_csv(path, cols): """ Read a ‘|’-separated MITMA CSV that may or may not contain a header and may start with a UTF-8 BOM. Returns a tidy DataFrame. """ df = pd.read_csv( path, sep="|", dtype=str, header=None, names=cols, encoding="utf-8-sig", ) df[cols[0]] = df[cols[0]].str.strip() if df.iloc[0, 0].upper() == cols[0].upper(): df = df.iloc[1:] return df nombre = _read_pipe_csv( self._resolve_data_file(f"nombres_{self.zones}.csv"), ["ID", "name"], ) pop = ( _read_pipe_csv( self._resolve_data_file(f"poblacion_{self.zones}.csv"), ["ID", "population"], ) .replace("NA", None) ) zonification = gpd.read_file( self._resolve_data_file(f"zonificacion_{self.zones}.shp") ) if zonification.crs is None or zonification.crs.to_epsg() != 4326: zonification = zonification.to_crs(epsg=4326) for col in zonification.columns: if col.lower() in {"id", "id_1", "codigo", "codigoine", "cod_mun"}: zonification["ID"] = zonification[col].astype(str).str.strip() break else: raise KeyError("No ID-like column found in the shapefile") complete_df = ( nombre.set_index("ID") .join(pop.set_index("ID")) .join(zonification.set_index("ID")) ) complete_df = gpd.GeoDataFrame(complete_df, crs="EPSG:4326") complete_df.reset_index(inplace=True) complete_df.rename(columns={"ID": "id"}, inplace=True) complete_df.set_index("id", inplace=True) complete_df.to_file(output_file_path, driver="GeoJSON") self.complete_df = complete_df return zonification = gpd.read_file( os.path.join(self.output_path, f"zonificacion-{self.zones}/{self.zones}_mitma.shp") ) if zonification.crs is None or zonification.crs.to_epsg() != 4326: zonification = zonification.to_crs(epsg=4326) complete_df = zonification complete_df.rename(columns={"ID": "id"}, inplace=True) complete_df.set_index("id", inplace=True) complete_df.to_file(output_file_path, driver="GeoJSON") self.complete_df = complete_df def _resolve_data_file(self, filename: str) -> str: """ Resolve a data file path, preferring the instance output path and falling back to the global default data directory for backward compatibility. """ preferred = os.path.join(self.output_path, filename) if os.path.exists(preferred): return preferred fallback = os.path.join(utils.get_data_directory(), filename) if os.path.exists(fallback): return fallback return preferred def _read_relation_table(self, filename: str) -> pd.DataFrame: """ Read relation CSV files with robust delimiter detection. """ path = self._resolve_data_file(filename) for sep in ("|", ",", ";", "\t"): try: df = pd.read_csv(path, sep=sep, dtype=str, encoding="utf-8-sig") if len(df.columns) > 1: return df except Exception: continue return pd.read_csv(path, dtype=str, encoding="utf-8-sig")
[docs] def get_zone_geodataframe(self): """ Function that returns the geodataframe with the zones. The geodataframe contains the following columns: - id: the id of the zone - name: the name of the zone - population: the population of the zone (if available) Parameters ---------- Examples -------- >>> from pyspainmobility import Zones >>> # instantiate the object >>> zones = Zones(zones='municipalities', version=2, output_directory='data') >>> # get the geodataframe with the zones >>> gdf = zones.get_zone_geodataframe() >>> print(gdf.head()) name population ID 01001 Alegría-Dulantzi 2925.0 01002 Amurrio 10307.0 01004_AM Artziniega agregacion de municipios 3005.0 01009_AM Asparrena agregacion de municipios 4599.0 """ self._load_zone_geodataframe() return self.complete_df
[docs] def get_zone_relations(self): """ Return official mapping tables between INE administrative units and MITMA zoning identifiers. For version 2, the returned table includes one row per relation entry with harmonized column names. For version 1, the method returns one row per MITMA zone id, where each relation column contains the set of linked INE identifiers. Parameters ---------- None Returns ------- pandas.DataFrame Relation table between census/municipality identifiers and MITMA zoning identifiers. Examples -------- >>> from pyspainmobility import Zones >>> zones = Zones(zones='municipalities', version=2, output_directory='data') >>> rel = zones.get_zone_relations() >>> rel.columns.tolist() ['census_sections', 'census_districts', 'municipalities', 'municipalities_mitma', 'districts_mitma', 'luas_mitma'] """ self._ensure_zoning_files_downloaded() if self.version == 2: relacion = self._read_relation_table('relacion_ine_zonificacionMitma.csv') remapping = { 'seccion_ine': 'census_sections', 'distrito_ine': 'census_districts', 'municipio_ine': 'municipalities', 'municipio_mitma': 'municipalities_mitma', 'distrito_mitma': 'districts_mitma', 'gau_mitma': 'luas_mitma' } relacion.rename(columns=remapping, inplace=True) relacion = relacion.replace('NA', None) return relacion else: used_zone = self.zones[:-1] relacion = self._read_relation_table(f'relaciones_{used_zone}_mitma.csv') relacion.rename(columns={f'{used_zone}_mitma': 'id'}, inplace=True) if used_zone == 'municipio': temp = self._read_relation_table('relaciones_distrito_mitma.csv') relacion = relacion.set_index('id').join(temp.set_index('municipio_mitma')).reset_index() if used_zone == 'distrito': temp = self._read_relation_table('relaciones_municipio_mitma.csv') relacion = relacion.set_index('municipio_mitma').join(temp.set_index('municipio_mitma')).reset_index() to_rename = { 'distrito': 'census_districts', 'distrito_mitma': 'districts_mitma', 'municipio': 'municipalities', 'municipio_mitma': 'municipalities_mitma', } relacion.rename(columns=to_rename, inplace=True) temp_df = pd.DataFrame(relacion['id'].unique(), columns=['id']).set_index('id') for i in list(relacion.columns): if i != 'id': temp_df = temp_df.join(relacion.groupby('id')[i].apply(set)) return temp_df