from pandas.errors import EmptyDataError
from pyspainmobility.utils import utils
import os
import pandas as pd
import tqdm
import warnings
from os.path import expanduser
from typing import Optional
# Optional Arrow import – used when backend='arrow'
try:
import pyarrow as pa
import pyarrow.csv as pacsv
except ImportError:
pa = None
pacsv = None
# Optional Dask import – only used when caller sets use_dask=True
try:
import dask.dataframe as dd
from dask import delayed
except ImportError:
dd = None
delayed = None
[docs]
class Mobility:
"""
This is the object taking care of the data download and preprocessing of (i) daily origin-destination matrices (ii), overnight stays and (iii) number of trips.
The data is downloaded from the Spanish Ministry of Transport, Mobility and Urban Agenda (MITMA) Open Data portal.
Additional information can be found at https://www.transportes.gob.es/ministerio/proyectos-singulares/estudio-de-movilidad-con-big-data.
The data is available for two versions: version 1 (2020-02-14 to 2021-05-09) and version 2 (2022-01-01 onward).
Data are available at different levels of granularity: districts (distritos), municipalities (municipios) and large urban areas (grandes áreas urbanas).
Concerning version 1, data are LUA are not available. Also, overnight stays are not available for version 1.
Parameters
----------
version : int
The version of the data to download. Default is 2. Version must be 1 or 2. Version 1 contains the data from 2020 to 2021. Version 2 contains the data from 2022 onwards.
zones : str
The zones to download the data for. Default is municipalities. Zones must be one of the following: districts, dist, distr, distritos, municipalities, muni, municipal, municipios, lua, large_urban_areas, gau, gaus, grandes_areas_urbanas
start_date : str
The start date of the data to download. Date must be in the format YYYY-MM-DD. A start date is required
end_date : str
The end date of the data to download. Default is None. Date must be in the format YYYY-MM-DD. if not specified, the end date will be the same as the start date.
output_directory : str
The directory to save the raw data and the processed parquet. Default is None. If not specified, the data will be saved in a folder named 'data' in user's home directory.
use_dask : bool
Whether to use Dask for processing large datasets. Default is False. Requires dask to be installed.
backend : str
Dataframe backend used while reading and processing files. Use
'arrow' (default) for Apache Arrow-backed pandas columns
(typically faster and more memory-efficient) or 'pandas' for
classic pandas dtypes. If 'arrow' is requested but pyarrow is not
installed, the class automatically falls back to 'pandas' and
emits a warning.
Examples
--------
>>> from pyspainmobility import Mobility
>>> # instantiate the object
>>> mobility_data = Mobility(version=2, zones='municipalities', start_date='2022-01-01', end_date='2022-01-06', output_directory='/Desktop/spain/data/')
>>> # download and save the origin-destination data
>>> mobility_data.get_od_data(keep_activity=True)
>>> # download and save the overnight stays data
>>> mobility_data.get_overnight_stays_data()
>>> # download and save the number of trips data
>>> mobility_data.get_number_of_trips_data()
"""
[docs]
def __init__(
self,
version: int = 2,
zones: str = 'municipalities',
start_date: str = None,
end_date: str = None,
output_directory: str = None,
use_dask: bool = False,
backend: str = "arrow",
):
self.version = version
self.zones = zones
self.start_date = start_date
self.output_directory = output_directory
self.use_dask = use_dask
self.backend = str(backend).lower()
if self.backend not in {"arrow", "pandas"}:
raise ValueError("backend must be either 'arrow' or 'pandas'")
if self.backend == "arrow" and (pa is None or pacsv is None):
warnings.warn(
"backend='arrow' requested but pyarrow is not installed. "
"Falling back to backend='pandas'. Install pyarrow for better "
"performance and lower memory usage.",
RuntimeWarning,
stacklevel=2,
)
self.backend = "pandas"
if self.use_dask and dd is None:
raise ImportError("Dask is not installed. Please install dask to use use_dask=True")
utils.zone_assert(zones, version)
utils.version_assert(version)
if start_date is None:
raise ValueError("start_date is required")
utils.date_format_assert(start_date)
if end_date is None:
end_date = start_date
utils.date_format_assert(end_date)
self.end_date = end_date
# --- fail fast if the end date is before the start date ---
from pandas import to_datetime
if to_datetime(end_date) < to_datetime(start_date):
raise ValueError(
f"end_date ({end_date}) must be the same as or after start_date ({start_date})."
)
self.zones = utils.zone_normalization(zones)
data_directory = utils.get_data_directory()
self.dates = utils.get_dates_between(start_date, end_date)
try:
valid_dates = utils.get_valid_dates(self.version)
except Exception as exc:
raise RuntimeError(
"Could not reach the MITMA open-data server while fetching the list of available dates "
f"for version {self.version}. This is usually a temporary problem on the government's side "
"(e.g. HTTP 500 / service maintenance). Please wait a few minutes and try again. "
f"Original error: {exc}"
) from exc
if not valid_dates:
raise RuntimeError(
f"Could not resolve valid dates for version {self.version}. "
"Please check network/data source availability and try again."
)
first, last = valid_dates[0], valid_dates[-1]
if self.dates[0] < first or self.dates[-1] > last:
raise ValueError(
f"Version {self.version} data are only available from {first} to {last}. "
f"You requested from {self.start_date} to {self.end_date}.")
# proper directory handling
if output_directory is not None:
if os.path.isabs(output_directory):
# Preserve absolute paths even if parent directories do not exist yet.
self.output_path = output_directory
else:
# Treat relative paths as relative to home directory.
home = expanduser("~")
clean_path = output_directory.lstrip("/\\")
self.output_path = os.path.join(home, clean_path)
else:
self.output_path = data_directory
#Ensure directory exists
try:
os.makedirs(self.output_path, exist_ok=True)
except PermissionError as e:
raise PermissionError(f"Cannot create directory {self.output_path}. Please check permissions or use a different path. Error: {e}")
except Exception as e:
raise Exception(f"Error creating directory {self.output_path}: {e}")
if self.version == 2:
if self.zones == 'gaus':
self.zones = 'GAU'
def _read_pipe_file(self, filepath: str, dtype: dict = None) -> pd.DataFrame:
"""
Read MITMA pipe-separated files using the configured backend.
"""
if self.backend == "arrow":
return self._read_pipe_file_arrow(filepath, dtype=dtype)
return self._read_pipe_file_pandas(filepath, dtype=dtype)
@staticmethod
def _normalize_column_name(name: str) -> str:
"""
Normalize a single column name for case-insensitive matching.
"""
return str(name).replace("\ufeff", "").strip().lower()
@staticmethod
def _align_dtype_map_to_source_columns(filepath: str, dtype: dict = None) -> Optional[dict]:
"""
Align dtype mapping keys to real source column names, handling
BOM/case/whitespace differences before parsing.
"""
if not dtype:
return dtype
try:
header_df = pd.read_csv(
filepath,
sep="|",
compression="infer",
encoding="utf-8-sig",
nrows=0,
)
except Exception:
return dtype
normalized_to_source = {
Mobility._normalize_column_name(col): col for col in header_df.columns
}
aligned = {}
for requested_col, requested_dtype in dtype.items():
source_col = normalized_to_source.get(
Mobility._normalize_column_name(requested_col)
)
if source_col is not None:
aligned[source_col] = requested_dtype
return aligned or dtype
@staticmethod
def _read_pipe_file_pandas(filepath: str, dtype: dict = None) -> pd.DataFrame:
"""
Pandas parser with BOM-safe UTF-8 handling.
"""
aligned_dtype = Mobility._align_dtype_map_to_source_columns(filepath, dtype)
return pd.read_csv(
filepath,
sep="|",
compression="infer",
encoding="utf-8-sig",
dtype=aligned_dtype,
low_memory=False,
)
@staticmethod
def _read_pipe_file_arrow(filepath: str, dtype: dict = None) -> pd.DataFrame:
"""
Apache Arrow CSV parser, converted to an Arrow-backed pandas DataFrame.
Falls back to pandas parser if Arrow cannot parse the source.
"""
if pa is None or pacsv is None:
warnings.warn(
"pyarrow is not available. Falling back to pandas parser. "
"Install pyarrow for better performance and lower memory usage.",
RuntimeWarning,
stacklevel=2,
)
return Mobility._read_pipe_file_pandas(filepath, dtype=dtype)
column_types = None
aligned_dtype = Mobility._align_dtype_map_to_source_columns(filepath, dtype)
if aligned_dtype:
column_types = {}
for col, typ in aligned_dtype.items():
if str(typ).lower() == "string":
column_types[col] = pa.string()
try:
table = pacsv.read_csv(
filepath,
read_options=pacsv.ReadOptions(encoding="utf8", use_threads=True),
parse_options=pacsv.ParseOptions(delimiter="|"),
convert_options=pacsv.ConvertOptions(
strings_can_be_null=True,
column_types=column_types,
),
)
return table.to_pandas(types_mapper=pd.ArrowDtype)
except Exception as exc:
print(f"[warn] Arrow parser failed for {filepath}: {exc}. Falling back to pandas parser.")
return Mobility._read_pipe_file_pandas(filepath, dtype=dtype)
def _finalize_backend_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Normalize output dtypes according to the selected backend.
"""
if self.backend != "arrow" or df is None:
return df
try:
return df.convert_dtypes(dtype_backend="pyarrow")
except TypeError:
return df.convert_dtypes()
@staticmethod
def _normalize_input_columns(df: pd.DataFrame) -> pd.DataFrame:
"""
Normalize incoming column names to avoid translation misses caused by
BOMs, casing differences, and surrounding whitespace.
"""
normalized = df.copy()
normalized.columns = [
str(col).replace("\ufeff", "").strip().lower() for col in normalized.columns
]
return normalized
@staticmethod
def _normalize_identifier_series(series: pd.Series) -> pd.Series:
"""
Keep zoning identifiers as strings and remove float artifacts/grouping
separators (e.g. '01001.0' -> '01001', '28.079' -> '28079').
"""
normalized = series.astype("string").str.strip()
normalized = normalized.replace({"": pd.NA, "NA": pd.NA, "nan": pd.NA, "None": pd.NA})
normalized = normalized.str.replace(r"\.0+$", "", regex=True)
normalized = normalized.str.replace(r"(?<=\d)\.(?=\d)", "", regex=True)
return normalized
@staticmethod
def _normalize_date_series(series: pd.Series) -> pd.Series:
"""
Convert MITMA date formats to YYYY-MM-DD.
"""
normalized = series.astype("string").str.strip()
normalized = normalized.replace({"": pd.NA, "NA": pd.NA, "nan": pd.NA, "None": pd.NA})
normalized = normalized.str.replace(r"\.0+$", "", regex=True)
normalized = normalized.str.replace("-", "", regex=False)
normalized = normalized.str.replace("/", "", regex=False)
normalized = normalized.str.replace(" ", "", regex=False)
normalized = normalized.str.zfill(8)
return (
normalized.str.slice(0, 4)
+ "-"
+ normalized.str.slice(4, 6)
+ "-"
+ normalized.str.slice(6, 8)
)
@staticmethod
def _to_numeric(series: pd.Series, strip_thousands: bool = False) -> pd.Series:
"""
Safe numeric conversion with support for comma decimal separators.
"""
normalized = series.astype("string").str.strip()
normalized = normalized.replace({"": pd.NA, "NA": pd.NA, "nan": pd.NA, "None": pd.NA})
normalized = normalized.str.replace(",", ".", regex=False)
if strip_thousands:
thousands_mask = normalized.str.fullmatch(r"[+-]?\d{1,3}(?:\.\d{3})+")
has_multi_group_separator = normalized.str.fullmatch(r"[+-]?\d{1,3}(?:\.\d{3}){2,}").fillna(False).any()
if has_multi_group_separator:
normalized = normalized.where(~thousands_mask, normalized.str.replace(".", "", regex=False))
return pd.to_numeric(normalized, errors="coerce")
@staticmethod
def _to_mitma_integer(series: pd.Series) -> pd.Series:
"""
Parse MITMA count-like fields to integers.
MITMA files encode large integers with dots (e.g. ``2214.577``), which
pandas can otherwise interpret as decimals. This parser follows the
project convention used in examples/issues:
- ``1.0`` -> ``1``
- ``2214.577`` -> ``2214577``
- ``128.457`` -> ``128457``
"""
normalized = series.astype("string").str.strip()
normalized = normalized.replace({"": pd.NA, "NA": pd.NA, "nan": pd.NA, "None": pd.NA})
normalized = normalized.str.replace(",", ".", regex=False)
# Keep integer-like values as-is (e.g. "1.0"), but compact dot-separated
# values used as grouped counts.
integer_like = normalized.str.fullmatch(r"[+-]?\d+(?:\.0+)?")
compacted = normalized.where(integer_like, normalized.str.replace(".", "", regex=False))
return pd.to_numeric(compacted, errors="coerce").astype("Int64")
def _process_single_od_file(self, filepath, keep_activity, social_agg):
"""Extract common OD file processing logic."""
print(f"Processing file: {filepath}")
# Check if file exists and get size
if not os.path.exists(filepath):
print(f"[ERROR] File does not exist: {filepath}")
return None
file_size = os.path.getsize(filepath)
#print(f"File size: {file_size} bytes")
if file_size == 0:
print(f"[warn] {os.path.basename(filepath)} is actually empty (0 bytes), skipped")
return None
try:
print(f"Reading {'gzipped' if filepath.endswith('.gz') else 'regular'} file...")
df = self._read_pipe_file(
filepath,
dtype={
"fecha": "string",
"periodo": "string",
"origen": "string",
"destino": "string",
"actividad_origen": "string",
"actividad_destino": "string",
"residencia": "string",
"renta": "string",
"edad": "string",
"sexo": "string",
"viajes": "string",
"viajes_km": "string",
},
)
df = self._normalize_input_columns(df)
if df.empty:
print(f"[warn] {os.path.basename(filepath)} contains no data rows, skipped")
return None
except EmptyDataError:
print(f"[warn] {os.path.basename(filepath)} triggered EmptyDataError, skipped")
return None
except Exception as e:
print(f"[ERROR] Error reading {filepath}: {e}")
return None
df.rename(
columns={
"fecha": "date",
"periodo": "hour",
"origen": "id_origin",
"destino": "id_destination",
"actividad_origen": "activity_origin",
"actividad_destino": "activity_destination",
"residencia": "residence_province_ine_code",
"distancia": "distance",
"viajes": "n_trips",
"viajes_km": "trips_total_length_km",
# socio-demo
"renta": "income",
"edad": "age",
"sexo": "gender",
},
inplace=True,
)
required_cols = ["date", "hour", "id_origin", "id_destination", "n_trips", "trips_total_length_km"]
missing = [col for col in required_cols if col not in df.columns]
if missing:
print(
f"[warn] {os.path.basename(filepath)} missing expected columns after translation: {missing}. "
f"Columns found: {list(df.columns)}"
)
return None
for optional_col in ["activity_origin", "activity_destination", "income", "age", "gender"]:
if optional_col not in df.columns:
df[optional_col] = pd.NA
df["date"] = self._normalize_date_series(df["date"])
df["id_origin"] = self._normalize_identifier_series(df["id_origin"])
df["id_destination"] = self._normalize_identifier_series(df["id_destination"])
if "residence_province_ine_code" in df.columns:
df["residence_province_ine_code"] = self._normalize_identifier_series(df["residence_province_ine_code"])
hour_numeric = self._to_numeric(df["hour"])
if hour_numeric.notna().all():
df["hour"] = hour_numeric.astype(int)
else:
df["hour"] = df["hour"].astype("string").str.strip()
df["n_trips"] = self._to_numeric(df["n_trips"], strip_thousands=True)
df["trips_total_length_km"] = self._to_numeric(df["trips_total_length_km"], strip_thousands=True)
df.dropna(
subset=["date", "id_origin", "id_destination", "n_trips", "trips_total_length_km"],
inplace=True,
)
if df.empty:
print(f"[warn] {os.path.basename(filepath)} has no valid rows after preprocessing, skipped")
return None
# map activity / gender labels
df.replace(
{
"activity_origin": {
"casa": "home",
"frecuente": "other_frequent",
"trabajo_estudio": "work_or_study",
"no_frecuente": "other_non_frequent",
},
"activity_destination": {
"casa": "home",
"frecuente": "other_frequent",
"trabajo_estudio": "work_or_study",
"no_frecuente": "other_non_frequent",
},
"gender": {"hombre": "male", "mujer": "female"},
},
inplace=True,
)
# ------------------------------------------------------
# BUILD GROUP-BY KEY ACCORDING TO THE TWO FLAGS
# ------------------------------------------------------
group_cols = ["date", "hour", "id_origin", "id_destination"]
if keep_activity:
group_cols += ["activity_origin", "activity_destination"]
if social_agg:
group_cols += ["income", "age", "gender"]
df = df.groupby(group_cols, as_index=False)[["n_trips", "trips_total_length_km"]].sum()
return df
[docs]
def get_od_data(self, keep_activity: bool = False, return_df: bool = False, social_agg: bool = False,):
"""
Function to download and save the origin-destination data.
Parameters
----------
keep_activity : bool
Default value is False. If True, the columns 'activity_origin' and 'activity_destination' will be kept in the final dataframe. If False, the columns will be dropped.
The columns contain the activity of the origin and destination zones. The possible values are: 'home', 'work_or_study', 'other_frequent', 'other_non_frequent'.
Consider that keeping the activity columns will increase the size of the final dataframe and the saved files significantly.
return_df : bool
Default value is False. If True, the function will return the dataframe in addition to saving it to a file.
social_agg : bool
Default value is False. Adds socio-demographic breakdown.
• income: <10 k, 10 to 15 k, >15 k € (in thousands)
• age: 0 to 24, 25 to 44, 45 to 64, >65 yrs, NA
• gender: male, female, NA
Examples
--------
>>> from pyspainmobility import Mobility
>>> # instantiate the object
>>> mobility_data = Mobility(version=2, zones='municipalities', start_date='2022-01-01', end_date='2022-01-06', output_directory='/Desktop/spain/data/')
>>> # download and save the origin-destination data
>>> mobility_data.get_od_data(keep_activity=True)
>>> # download and save the od data and return the dataframe
>>> df = mobility_data.get_od_data(keep_activity=False, return_df=True)
>>> print(df.head())
date hour id_origin id_destination n_trips trips_total_length_km
0 2023-04-01 0 01001 01001 5.006 19.878000
1 2023-04-01 0 01001 01009_AM 14.994 70.697000
2 2023-04-01 0 01001 01058_AM 9.268 87.698000
3 2023-04-01 0 01001 01059 42.835 512.278674
4 2023-04-01 0 01001 48036 2.750 147.724000
"""
if self.version == 2:
m_type = "Viajes"
local_list = self._donwload_helper(m_type)
temp_dfs = []
print("Generating parquet file for ODs....")
if self.use_dask:
# Use Dask for processing
return self._process_od_data_dask(local_list, m_type, keep_activity, social_agg, return_df)
else:
# Original pandas processing using extracted method
for f in tqdm.tqdm(local_list):
result = self._process_single_od_file(f, keep_activity, social_agg)
if result is not None:
temp_dfs.append(result)
if not temp_dfs:
print("No valid data found")
return None
print("Concatenating all the dataframes....")
df = temp_dfs[0] if len(temp_dfs) == 1 else pd.concat(temp_dfs)
df = self._finalize_backend_dataframe(df)
self._saving_parquet(df, m_type)
return df if return_df else None
elif self.version == 1:
m_type = "maestra1"
local_list = self._donwload_helper(m_type)
temp_dfs = []
print("Generating parquet file for ODs....")
if self.use_dask:
# Use Dask for processing
return self._process_od_data_dask(local_list, m_type, False, False, return_df)
else:
# Original pandas processing using extracted method
for f in tqdm.tqdm(local_list):
result = self._process_single_od_file(f, False, False)
if result is not None:
temp_dfs.append(result)
if not temp_dfs:
print("No valid data found")
return None
print("Concatenating all the dataframes....")
df = temp_dfs[0] if len(temp_dfs) == 1 else pd.concat(temp_dfs)
df = self._finalize_backend_dataframe(df)
self._saving_parquet(df, m_type)
return df if return_df else None
return None
def _process_od_data_dask(self, local_list, m_type, keep_activity, social_agg, return_df):
"""Process OD data using Dask for better performance with large datasets """
print("Processing with Dask...")
if delayed is None:
raise ImportError("Dask delayed is not available")
@delayed
def process_single_file(filepath):
return self._process_single_od_file(filepath, keep_activity, social_agg)
# Create delayed tasks for each file
delayed_tasks = [process_single_file(f) for f in local_list]
# Compute all delayed tasks
try:
processed_dfs = dd.compute(*delayed_tasks)
except Exception as e:
print(f"Dask computation failed: {e}")
print("Falling back to pandas processing...")
# Fallback using the same processing method
processed_dfs = []
for f in tqdm.tqdm(local_list):
result = self._process_single_od_file(f, keep_activity, social_agg)
if result is not None:
processed_dfs.append(result)
# Filter out None results and concatenate
valid_dfs = [df for df in processed_dfs if df is not None]
if not valid_dfs:
print("No valid data found")
return None
print("Concatenating results...")
df = pd.concat(valid_dfs, ignore_index=True)
df = self._finalize_backend_dataframe(df)
self._saving_parquet(df, m_type)
return df if return_df else None
def _process_single_overnight_file(self, filepath: str):
"""
Parse and normalize one overnight stays file.
"""
try:
df = self._read_pipe_file(
filepath,
dtype={
"fecha": "string",
"zona_residencia": "string",
"zona_pernoctacion": "string",
"personas": "string",
},
)
df = self._normalize_input_columns(df)
if df.empty:
return None
df.rename(
columns={
"fecha": "date",
"zona_residencia": "residence_area",
"zona_pernoctacion": "overnight_stay_area",
"personas": "people",
},
inplace=True,
)
required_cols = ["date", "residence_area", "overnight_stay_area", "people"]
missing = [col for col in required_cols if col not in df.columns]
if missing:
print(
f"[warn] {os.path.basename(filepath)} missing expected columns after translation: {missing}. "
f"Columns found: {list(df.columns)}"
)
return None
df["date"] = self._normalize_date_series(df["date"])
df["residence_area"] = self._normalize_identifier_series(df["residence_area"])
df["overnight_stay_area"] = self._normalize_identifier_series(df["overnight_stay_area"])
df["people"] = self._to_numeric(df["people"], strip_thousands=True)
df.dropna(subset=required_cols, inplace=True)
return df
except EmptyDataError:
print(f"[warn] {os.path.basename(filepath)} triggered EmptyDataError, skipped")
return None
except Exception as e:
print(f"Error processing {filepath}: {e}")
return None
def _process_single_number_of_trips_file(self, filepath: str):
"""
Parse and normalize one number-of-trips file for the active version.
"""
try:
if self.version == 2:
dtype = {
"fecha": "string",
"zona_pernoctacion": "string",
"edad": "string",
"sexo": "string",
"numero_viajes": "string",
"personas": "string",
}
rename_map = {
"fecha": "date",
"zona_pernoctacion": "overnight_stay_area",
"edad": "age",
"sexo": "gender",
"numero_viajes": "number_of_trips",
"personas": "people",
}
else:
dtype = {
"fecha": "string",
"distrito": "string",
"numero_viajes": "string",
"personas": "string",
}
rename_map = {
"fecha": "date",
"distrito": "overnight_stay_area",
"numero_viajes": "number_of_trips",
"personas": "people",
}
df = self._read_pipe_file(filepath, dtype=dtype)
df = self._normalize_input_columns(df)
if df.empty:
return None
df.rename(columns=rename_map, inplace=True)
required_cols = ["date", "overnight_stay_area", "number_of_trips", "people"]
missing = [col for col in required_cols if col not in df.columns]
if missing:
print(
f"[warn] {os.path.basename(filepath)} missing expected columns after translation: {missing}. "
f"Columns found: {list(df.columns)}"
)
return None
if "age" not in df.columns:
df["age"] = pd.NA
if "gender" not in df.columns:
df["gender"] = pd.NA
df["date"] = self._normalize_date_series(df["date"])
df["overnight_stay_area"] = self._normalize_identifier_series(df["overnight_stay_area"])
df["number_of_trips"] = df["number_of_trips"].astype("string").str.strip().str.replace(r"\.0+$", "", regex=True)
df["people"] = self._to_numeric(df["people"], strip_thousands=True)
df.replace({"gender": {"hombre": "male", "mujer": "female"}}, inplace=True)
df.dropna(subset=["date", "overnight_stay_area", "number_of_trips", "people"], inplace=True)
return df
except EmptyDataError:
print(f"[warn] {os.path.basename(filepath)} triggered EmptyDataError, skipped")
return None
except Exception as e:
print(f"Error processing {filepath}: {e}")
return None
[docs]
def get_overnight_stays_data(self, return_df: bool = False):
"""
Function to download and save the overnight stays data.
Parameters
----------
return_df : bool
Default value is False. If True, the function will return the dataframe in addition to saving it to a file.
Examples
--------
>>> from pyspainmobility import Mobility
>>> # instantiate the object
>>> mobility_data = Mobility(version=2, zones='municipalities', start_date='2022-01-01', end_date='2022-01-06', output_directory='/Desktop/spain/data/')
>>> # download and save the overnight stays data and return the dataframe
>>> df = mobility_data.get_overnight_stays_data( return_df=True)
>>> print(df.head())
date residence_area overnight_stay_area people
0 2023-04-01 01001 01001 2716.303
1 2023-04-01 01001 01009_AM 14.088
2 2023-04-01 01001 01017_AM 2.476
3 2023-04-01 01001 01058_AM 18.939
4 2023-04-01 01001 01059 144.118
"""
if self.version == 2:
m_type = 'Pernoctaciones'
local_list = self._donwload_helper(m_type)
print('Generating parquet file for Overnight Stays....')
if self.use_dask and len(local_list) > 1:
@delayed
def process_overnight_file(filepath):
return self._process_single_overnight_file(filepath)
delayed_tasks = [process_overnight_file(f) for f in local_list]
try:
processed_dfs = dd.compute(*delayed_tasks)
except Exception as e:
print(f"Dask computation failed: {e}. Falling back to pandas processing...")
processed_dfs = []
for f in tqdm.tqdm(local_list):
result = self._process_single_overnight_file(f)
if result is not None:
processed_dfs.append(result)
else:
processed_dfs = []
for f in tqdm.tqdm(local_list):
result = self._process_single_overnight_file(f)
if result is not None:
processed_dfs.append(result)
valid_dfs = [df for df in processed_dfs if df is not None]
if not valid_dfs:
print("No valid data found")
return None
print('Concatenating all the dataframes....')
df = pd.concat(valid_dfs, ignore_index=True)
df = self._finalize_backend_dataframe(df)
self._saving_parquet(df, m_type)
if return_df:
return df
elif self.version == 1:
raise Exception('Overnight stays data is not available for version 1. Please use version 2.')
return None
[docs]
def get_number_of_trips_data(self, return_df: bool = False):
"""
Function to download and save the data regarding the number of trips to an area of certain demographic categories.
Parameters
----------
return_df : bool
Default value is False. If True, the function will return the dataframe in addition to saving it to a file.
Examples
--------
>>> from pyspainmobility import Mobility
>>> # instantiate the object
>>> mobility_data = Mobility(version=2, zones='municipalities', start_date='2022-01-01', end_date='2022-01-06', output_directory='/Desktop/spain/data/')
>>> # download and save the overnight stays data and return the dataframe
>>> df = mobility_data.get_number_of_trips_data( return_df=True)
>>> print(df.head())
date overnight_stay_area age gender number_of_trips people
0 2023-04-01 01001 0-25 male 0 128.457
1 2023-04-01 01001 0-25 male 1 38.537
2 2023-04-01 01001 0-25 male 2 129.136
3 2023-04-01 01001 0-25 male 2+ 129.913
4 2023-04-01 01001 0-25 female 0 188.744
"""
if self.version == 2:
m_type = 'Personas'
local_list = self._donwload_helper(m_type)
print('Generating parquet file for Number of Trips....')
if self.use_dask and len(local_list) > 1:
@delayed
def process_trips_file(filepath):
return self._process_single_number_of_trips_file(filepath)
delayed_tasks = [process_trips_file(f) for f in local_list]
try:
processed_dfs = dd.compute(*delayed_tasks)
except Exception as e:
print(f"Dask computation failed: {e}. Falling back to pandas processing...")
processed_dfs = []
for f in tqdm.tqdm(local_list):
result = self._process_single_number_of_trips_file(f)
if result is not None:
processed_dfs.append(result)
else:
processed_dfs = []
for f in tqdm.tqdm(local_list):
result = self._process_single_number_of_trips_file(f)
if result is not None:
processed_dfs.append(result)
valid_dfs = [df for df in processed_dfs if df is not None]
if not valid_dfs:
print("No valid data found")
return None
print('Concatenating all the dataframes....')
df = pd.concat(valid_dfs, ignore_index=True)
df = self._finalize_backend_dataframe(df)
self._saving_parquet(df, m_type)
if return_df:
return df
if self.version == 1:
m_type = 'maestra2'
local_list = self._donwload_helper(m_type)
print('Generating parquet file for Number of Trips....')
if self.use_dask and len(local_list) > 1:
@delayed
def process_trips_file(filepath):
return self._process_single_number_of_trips_file(filepath)
delayed_tasks = [process_trips_file(f) for f in local_list]
try:
processed_dfs = dd.compute(*delayed_tasks)
except Exception as e:
print(f"Dask computation failed: {e}. Falling back to pandas processing...")
processed_dfs = []
for f in tqdm.tqdm(local_list):
result = self._process_single_number_of_trips_file(f)
if result is not None:
processed_dfs.append(result)
else:
processed_dfs = []
for f in tqdm.tqdm(local_list):
result = self._process_single_number_of_trips_file(f)
if result is not None:
processed_dfs.append(result)
valid_dfs = [df for df in processed_dfs if df is not None]
if not valid_dfs:
print("No valid data found")
return None
print('Concatenating all the dataframes....')
df = pd.concat(valid_dfs, ignore_index=True)
df = self._finalize_backend_dataframe(df)
self._saving_parquet(df, m_type)
if return_df:
return df
return None
def _saving_parquet(self, df: pd.DataFrame, m_type: str):
print('Writing the parquet file....')
df.to_parquet(
os.path.join(self.output_path,
f"{m_type}_{self.zones}_{self.start_date}_{self.end_date}_v{self.version}.parquet"),
index=False)
print('Parquet file generated successfully at ',
os.path.join(self.output_path, f"{m_type}_{self.zones}_{self.start_date}_{self.end_date}_v{self.version}.parquet"))
def _donwload_helper(self, m_type:str):
local_list = []
if self.version == 2:
for d in self.dates:
d_first = d[:7]
d_second = d.replace("-", "")
if m_type == 'Personas':
download_url = f"https://movilidad-opendata.mitma.es/estudios_basicos/por-{self.zones}/{m_type.lower()}/ficheros-diarios/{d_first}/{d_second}_{m_type}_dia_{self.zones}.csv.gz"
else:
download_url = f"https://movilidad-opendata.mitma.es/estudios_basicos/por-{self.zones}/{m_type.lower()}/ficheros-diarios/{d_first}/{d_second}_{m_type}_{self.zones}.csv.gz"
print('Downloading file from', download_url)
try:
utils.download_file_if_not_existing(download_url,os.path.join(self.output_path, f"{d_second}_{m_type}_{self.zones}_v{self.version}.csv.gz"))
local_list.append(os.path.join(self.output_path, f"{d_second}_{m_type}_{self.zones}_v{self.version}.csv.gz"))
except Exception as exc:
print(f"[warn] Failed to download {download_url}: {exc}")
continue
elif self.version == 1:
if self.zones == 'gaus':
raise Exception('gaus is not a valid zone for version 1. Please use version 2 or use a different zone')
for d in self.dates:
d_first = d[:7]
d_second = d.replace("-", "")
try:
url_base = f"https://opendata-movilidad.mitma.es/{m_type}-mitma-{self.zones}/ficheros-diarios/{d_first}/{d_second}_{m_type[:-1]}_{m_type[-1]}_mitma_{self.zones[:-1]}.txt.gz"
utils.download_file_if_not_existing(url_base, os.path.join(self.output_path, f"{d_second}_{m_type}_{self.zones}_v{self.version}.txt.gz"))
local_list.append(os.path.join(self.output_path, f"{d_second}_{m_type}_{self.zones}_v{self.version}.txt.gz"))
except Exception as exc:
print(f"[warn] Failed to download {url_base}: {exc}")
continue
return local_list