Source code for pyspainmobility.zones.zones

from pyspainmobility.utils import utils
import pandas as pd
import geopandas as gpd
import os
import matplotlib
from os.path import expanduser

[docs] class Zones:
[docs] def __init__(self, zones: str = None, version: int = 1, output_directory: str = None): """ Class to handle the zoning related to the Spanish big mobility data. The class is used to download the data and process it. Selectable granularities are districts (distritos), municipalities (municipios) and large urban areas (grandes áreas urbanas). As a reminder, mobility data for the COVID-19 period (version 1) are not available for the large urban areas. Parameters ---------- zones : str The zones to download the data for. Default is municipalities. Zones must be one of the following: districts, dist, distr, distritos, municipalities, muni, municipal, municipios, lua, large_urban_areas, gau, gaus, grandes_areas_urbanas version : int The version of the data to download. Default is 2. Version must be 1 or 2. Version 1 contains the data from 2020 to 2021. Version 2 contains the data from 2022 onwards. output_directory : str The directory to save the raw data and the processed parquet. Default is None. If not specified, the data will be saved in a folder named 'data' in user's home directory. Examples -------- >>> from pyspainmobility import Zones >>> # instantiate the object >>> zones = Zones(zones='municipalities', version=2, output_directory='data') >>> # get the geodataframe with the zones >>> gdf = zones.get_zone_geodataframe() >>> print(gdf.head()) name population ID 01001 Alegría-Dulantzi 2925.0 01002 Amurrio 10307.0 01004_AM Artziniega agregacion de municipios 3005.0 01009_AM Asparrena agregacion de municipios 4599.0 """ utils.version_assert(version) utils.zone_assert(zones, version) self.version = version zones = utils.zone_normalization(zones) self.zones = zones links = utils.available_zoning_data(version, zones)['link'].unique().tolist() self.complete_df = None # Get the data directory data_directory = utils.get_data_directory() self.data_directory = data_directory # Proper output directory handling if output_directory is not None: # Always treat as relative to home directory unless it's a proper absolute system path if os.path.isabs(output_directory) and os.path.exists(os.path.dirname(output_directory)): # It's a valid absolute path self.output_path = output_directory else: # Treat as relative to home directory, strip leading slash if present home = expanduser("~") clean_path = output_directory.lstrip('/') self.output_path = os.path.join(home, clean_path) else: self.output_path = data_directory # Ensure directory exists try: os.makedirs(self.output_path, exist_ok=True) except PermissionError as e: raise PermissionError(f"Cannot create directory {self.output_path}. Please check permissions or use a different path. Error: {e}") except Exception as e: raise Exception(f"Error creating directory {self.output_path}: {e}") # for each link, check if the file exists in the data directory. If not, download it for link in links: # Get the file name file_name = link.split('/')[-1] # Check if the file exists in the data directory local_path = os.path.join(self.output_path, file_name) if not os.path.exists(local_path): # Download the file print("Downloading necessary files....") utils.download_file_if_not_existing(link, local_path) # unzip zonification_distritos.zip or zonificacion_municipios.zip if version is 1 if version == 1 and file_name.endswith('.zip'): utils.unzip_file(os.path.join(self.output_path, file_name), self.output_path) print('Zones already downloaded. Reading the files....') complete_df = None # check if a previously processed file exists in the output directory output_file_path = os.path.join(self.output_path, f'{zones}_{version}.geojson') if os.path.exists(output_file_path): print(f"File {output_file_path} already exists. Loading it...") complete_df = gpd.read_file(output_file_path) self.complete_df = complete_df if complete_df is None and version == 2: def _read_pipe_csv(path, cols): """ Read a ‘|’-separated MITMA CSV that may or may not contain a header and may start with a UTF-8 BOM. Returns a tidy DataFrame. """ df = pd.read_csv( path, sep="|", dtype=str, header=None, # read everything as data names=cols, encoding="utf-8-sig" ) df[cols[0]] = df[cols[0]].str.strip() # drop stray header row, if present if df.iloc[0, 0].upper() == cols[0].upper(): df = df.iloc[1:] return df nombre = _read_pipe_csv( os.path.join(data_directory, f"nombres_{zones}.csv"), ["ID", "name"] ) pop = ( _read_pipe_csv( os.path.join(data_directory, f"poblacion_{zones}.csv"), ["ID", "population"] ) .replace("NA", None) ) zonification = gpd.read_file( os.path.join(data_directory, f"zonificacion_{zones}.shp") ) if zonification.crs is None or zonification.crs.to_epsg() != 4326: zonification = zonification.to_crs(epsg=4326) # find column holds the municipal code for col in zonification.columns: if col.lower() in {"id", "id_1", "codigo", "codigoine", "cod_mun"}: zonification["ID"] = zonification[col].astype(str).str.strip() break else: raise KeyError("No ID-like column found in the shapefile") complete_df = ( nombre.set_index("ID") .join(pop.set_index("ID")) .join(zonification.set_index("ID")) ) complete_df = gpd.GeoDataFrame(complete_df, crs="EPSG:4326") complete_df.reset_index(inplace=True) complete_df.rename(columns={"ID": "id"}, inplace=True) complete_df.set_index("id", inplace=True) # write the cache file complete_df.to_file(output_file_path, driver="GeoJSON") # make it available to the rest of the class self.complete_df = complete_df if complete_df is None and version == 1: zonification = gpd.read_file( os.path.join(self.output_path, f"zonificacion-{zones}/{zones}_mitma.shp") ) if zonification.crs is None or zonification.crs.to_epsg() != 4326: zonification = zonification.to_crs(epsg=4326) complete_df = zonification complete_df.rename(columns={"ID": "id"}, inplace=True) complete_df.set_index("id", inplace=True) # write the cache file complete_df.to_file(output_file_path, driver="GeoJSON") # make it available to the rest of the class self.complete_df = complete_df
[docs] def get_zone_geodataframe(self): """ Function that returns the geodataframe with the zones. The geodataframe contains the following columns: - id: the id of the zone - name: the name of the zone - population: the population of the zone (if available) Parameters ---------- Examples -------- >>> from pyspainmobility import Zones >>> # instantiate the object >>> zones = Zones(zones='municipalities', version=2, output_directory='data') >>> # get the geodataframe with the zones >>> gdf = zones.get_zone_geodataframe() >>> print(gdf.head()) name population ID 01001 Alegría-Dulantzi 2925.0 01002 Amurrio 10307.0 01004_AM Artziniega agregacion de municipios 3005.0 01009_AM Asparrena agregacion de municipios 4599.0 """ return self.complete_df
[docs] def get_zone_relations(self): """ TODO Parameters ---------- Examples -------- >>> from pyspainmobility import Zones >>> # instantiate the object >>> zones = Zones(zones='municipalities', version=2, output_directory='data') >>> # get the geodataframe with the zones >>> gdf = zones.get_zone_geodataframe() >>> print(gdf.head()) name population ID 01001 Alegría-Dulantzi 2925.0 01002 Amurrio 10307.0 01004_AM Artziniega agregacion de municipios 3005.0 01009_AM Asparrena agregacion de municipios 4599.0 """ if self.version == 2: relacion = gpd.read_file(os.path.join(utils.get_data_directory(), 'relacion_ine_zonificacionMitma.csv')) remapping = { 'seccion_ine': 'census_sections', 'distrito_ine': 'census_districts', 'municipio_ine': 'municipalities', 'municipio_mitma': 'municipalities_mitma', 'distrito_mitma': 'districts_mitma', 'gau_mitma': 'luas_mitma' } relacion.rename(columns=remapping, inplace=True) relacion = relacion.replace('NA', None) return relacion else: used_zone = self.zones[:-1] relacion = gpd.read_file(os.path.join(utils.get_data_directory(), f'relaciones_{used_zone}_mitma.csv')) relacion.rename(columns={f'{used_zone}_mitma': 'id'}, inplace=True) if used_zone == 'municipio': temp = gpd.read_file(os.path.join(utils.get_data_directory(), 'relaciones_distrito_mitma.csv')) relacion = relacion.set_index('id').join(temp.set_index('municipio_mitma')).reset_index() if used_zone == 'distrito': temp = gpd.read_file(os.path.join(utils.get_data_directory(), 'relaciones_municipio_mitma.csv')) relacion = relacion.set_index('municipio_mitma').join(temp.set_index('municipio_mitma')).reset_index() to_rename = { 'distrito': 'census_districts', 'distrito_mitma': 'districts_mitma', 'municipio': 'municipalities', 'municipio_mitma': 'municipalities_mitma', } relacion.rename(columns=to_rename, inplace=True) temp_df = pd.DataFrame(relacion['id'].unique(), columns=['id']).set_index('id') for i in list(relacion.columns): if i != 'id': temp_df = temp_df.join(relacion.groupby('id')[i].apply(set)) return temp_df