Python Standards

This document outlines the Python standards and best practices for the Causeway data platform. Following these standards ensures consistency, maintainability, and quality across all Python projects and scripts.

Consistent Python standards are critical for building reliable data engineering solutions. These guidelines will help ensure your Python code is well-structured, maintainable, and follows industry best practices, whether you're developing ETL pipelines, data analysis scripts, or data processing applications.

Code Style

We follow PEP 8 for our Python code style with some additional requirements:

Formatting

# Required development tools
pip install black isort pylint mypy

Import Conventions

Organize imports in the following order:

  1. Standard library imports
  2. Related third-party imports
  3. Local application/library specific imports
# Standard library imports
import os
import sys
from datetime import datetime

# Third-party imports
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession

# Local application imports
from causeway.utils import logger
from causeway.data import transformations

Naming Conventions

TypeConventionExample
VariablesSnake caseuser_count, total_amount
FunctionsSnake casecalculate_total(), transform_data()
ClassesPascal case (CapWords)DataProcessor, UserManager
ConstantsUppercase with underscoresMAX_CONNECTIONS, DEFAULT_TIMEOUT
ModulesShort, snake casedata_processors.py, utils.py
PackagesShort, snake case, no underscoresdataprocessors, utils
Private Variables/MethodsPrefix with single underscore_internal_count, _calculate_hash()
Protected Variables/MethodsPrefix with double underscore__sensitive_data, __secure_method()

Descriptive Naming

Project Structure

Follow this standard project structure for all Python projects:

project_name/
├── README.md
├── requirements.txt
├── setup.py
├── .gitignore
├── .pre-commit-config.yaml
├── docs/
│   ├── index.md
│   └── api.md
├── tests/
│   ├── __init__.py
│   ├── test_module1.py
│   └── test_module2.py
├── project_name/
│   ├── __init__.py
│   ├── main.py
│   ├── module1.py
│   ├── module2.py
│   └── utils/
│       ├── __init__.py
│       └── helpers.py
└── notebooks/
    └── exploratory_analysis.ipynb

Key Structure Elements

Documentation

Docstrings

Use PEP 257 docstrings with Google style format:

def transform_dataframe(df, columns=None, aggregation='sum'):
    """Transform a DataFrame by selecting columns and applying aggregation.

    Args:
        df (pandas.DataFrame): The input DataFrame to transform.
        columns (list, optional): List of column names to include.
            If None, all columns are used. Defaults to None.
        aggregation (str, optional): Type of aggregation to apply.
            Options are 'sum', 'mean', 'max', 'min'. Defaults to 'sum'.

    Returns:
        pandas.DataFrame: The transformed DataFrame.

    Raises:
        ValueError: If an invalid aggregation method is provided.
        TypeError: If df is not a pandas DataFrame.

    Examples:
        >>> import pandas as pd
        >>> df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]})
        >>> transform_dataframe(df, columns=['A'], aggregation='mean')
           A
        0  2
    """
    # Function implementation here
    pass

Comments

# TODO(JIRA-1234): Optimize this function for large datasets
def process_large_data(data):
    # Using a dictionary for O(1) lookups instead of a list
    # because performance testing showed significant improvement
    result_map = {}

    # Rest of implementation...

Testing

All Python code should have comprehensive tests:

Test Framework

import pytest
import pandas as pd
from causeway.data import transform_dataframe

@pytest.fixture
def sample_dataframe():
    """Create a sample DataFrame for testing."""
    return pd.DataFrame({
        'A': [1, 2, 3],
        'B': [4, 5, 6]
    })

def test_transform_dataframe_with_mean(sample_dataframe):
    """Test that transform_dataframe correctly calculates the mean."""
    result = transform_dataframe(sample_dataframe, aggregation='mean')
    assert result['A'].iloc[0] == 2
    assert result['B'].iloc[0] == 5

def test_transform_dataframe_with_invalid_agg(sample_dataframe):
    """Test that transform_dataframe raises error with invalid aggregation."""
    with pytest.raises(ValueError):
        transform_dataframe(sample_dataframe, aggregation='invalid')

Coverage Requirements

Error Handling

Use Specific Exceptions

Raise and catch specific exceptions that clearly communicate what went wrong. Avoid generic exceptions like Exception.

Instead of:

try:
    process_data(data)
except Exception as e:
    print(f"Error: {e}")
    return None

Use:

try:
    process_data(data)
except ValueError as e:
    logger.error(f"Invalid data format: {e}")
    raise
except IOError as e:
    logger.error(f"I/O error in processing: {e}")
    return None

Define Custom Exceptions

Create custom exception classes for application-specific errors, inheriting from appropriate base exceptions.

class DataProcessingError(Exception):
    """Base class for all data processing errors."""
    pass

class InvalidDataFormatError(DataProcessingError):
    """Raised when the data format is invalid."""
    pass

class DataQualityError(DataProcessingError):
    """Raised when data quality checks fail."""
    def __init__(self, message, failed_checks=None):
        super().__init__(message)
        self.failed_checks = failed_checks or []

Log Appropriately

Use proper logging with appropriate log levels rather than print statements.

import logging

logger = logging.getLogger(__name__)

def process_data(data):
    try:
        logger.debug(f"Processing data batch: {len(data)} records")
        result = transform(data)
        logger.info(f"Successfully processed {len(data)} records")
        return result
    except ValueError as e:
        logger.error(f"Invalid data encountered: {e}", exc_info=True)
        raise DataQualityError(f"Failed to process data: {e}")
    except Exception as e:
        logger.critical(f"Unexpected error in data processing: {e}", exc_info=True)
        raise

Common Patterns

Context Managers

Use context managers (with statements) for resource management.

def process_large_file(file_path):
    with open(file_path, 'r') as file:
        # File automatically closed after this block
        for line in file:
            process_line(line)

# Custom context manager example
class DatabaseConnection:
    def __init__(self, connection_string):
        self.connection_string = connection_string
        self.connection = None

    def __enter__(self):
        self.connection = connect_to_db(self.connection_string)
        return self.connection

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.connection:
            self.connection.close()

Generators

Use generators for memory-efficient processing of large datasets.

def read_large_file(file_path, chunk_size=1000):
    """Read a large file line by line, yielding chunks of lines."""
    current_chunk = []

    with open(file_path, 'r') as file:
        for i, line in enumerate(file):
            current_chunk.append(line)
            if (i + 1) % chunk_size == 0:
                yield current_chunk
                current_chunk = []

        # Yield the last chunk if it's not empty
        if current_chunk:
            yield current_chunk

Dependency Injection

Use dependency injection to make code more testable and flexible.

class DataProcessor:
    def __init__(self, storage_client, transformer=None, logger=None):
        """Initialize with injected dependencies.

        Args:
            storage_client: Client for storage operations
            transformer: Optional transformer for data
            logger: Optional logger instance
        """
        self.storage_client = storage_client
        self.transformer = transformer or DefaultTransformer()
        self.logger = logger or logging.getLogger(__name__)

    def process(self, data_id):
        """Process data by ID."""
        self.logger.info(f"Processing data ID: {data_id}")
        raw_data = self.storage_client.get_data(data_id)
        processed_data = self.transformer.transform(raw_data)
        return processed_data

Anti-Patterns

Global State

Avoid using global variables as they make code harder to test and understand.

Avoid:

# Global variables
CONFIGURATION = {}
DATA_CACHE = {}

def load_config():
    global CONFIGURATION
    CONFIGURATION = load_from_file()

def process_data(data_id):
    global DATA_CACHE
    if data_id in DATA_CACHE:
        return DATA_CACHE[data_id]
    # Process and cache...

Better approach:

class Configuration:
    def __init__(self):
        self.settings = {}

    def load(self):
        self.settings = load_from_file()

class DataProcessor:
    def __init__(self, config, cache=None):
        self.config = config
        self.cache = cache or {}

    def process_data(self, data_id):
        if data_id in self.cache:
            return self.cache[data_id]
        # Process and cache...

Ignoring Type Hints

Always use type hints for better code readability and static analysis.

Avoid:

def process_data(data, options=None):
    if options is None:
        options = {}
    # Process data...

Better approach:

from typing import Dict, List, Optional, Any

def process_data(
    data: List[Dict[str, Any]],
    options: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
    if options is None:
        options = {}
    # Process data...

Overusing Classes

Don't use classes when simple functions would suffice.

Avoid:

class Adder:
    def __init__(self, x):
        self.x = x

    def add(self, y):
        return self.x + y

# Usage
adder = Adder(5)
result = adder.add(10)  # result = 15

Better approach:

def add(x, y):
    return x + y

# Usage
result = add(5, 10)  # result = 15

Examples

Example Data Pipeline

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Data Pipeline Example
====================

This module demonstrates a well-structured data processing pipeline
following Causeway Python standards.
"""
import logging
from typing import Dict, List, Optional, Union
import pandas as pd

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class DataQualityError(Exception):
    """Raised when data quality checks fail."""
    def __init__(self, message: str, failed_checks: Optional[List[str]] = None):
        super().__init__(message)
        self.failed_checks = failed_checks or []


class DataProcessor:
    """Processes data according to provided configuration.

    This class encapsulates all data processing logic with configurable
    transformations and validation rules.
    """

    def __init__(self, config: Dict[str, Union[str, int, float]]):
        """Initialize processor with configuration.

        Args:
            config: Configuration dictionary containing processing parameters
        """
        self.config = config
        logger.info(f"Initialized DataProcessor with config: {config}")

    def _validate_data(self, df: pd.DataFrame) -> List[str]:
        """Validate the data quality.

        Args:
            df: DataFrame to validate

        Returns:
            List of failed validation checks, empty if all passed
        """
        failed_checks = []

        # Check for missing values in required columns
        required_columns = self.config.get('required_columns', [])
        for col in required_columns:
            if col in df.columns and df[col].isna().any():
                failed_checks.append(f"Missing values in required column: {col}")

        # Check for duplicates if required
        if self.config.get('check_duplicates', False):
            id_column = self.config.get('id_column', 'id')
            if id_column in df.columns and df[id_column].duplicated().any():
                failed_checks.append(f"Duplicate values in ID column: {id_column}")

        logger.debug(f"Validation complete: {len(failed_checks)} checks failed")
        return failed_checks

    def process(self, df: pd.DataFrame) -> pd.DataFrame:
        """Process the input DataFrame.

        Args:
            df: Input DataFrame to process

        Returns:
            Processed DataFrame

        Raises:
            DataQualityError: If data validation fails
            ValueError: If the input DataFrame is empty
        """
        if df.empty:
            raise ValueError("Cannot process empty DataFrame")

        logger.info(f"Processing DataFrame with {len(df)} rows and {len(df.columns)} columns")

        # Validate data
        failed_checks = self._validate_data(df)
        if failed_checks:
            logger.error(f"Data validation failed: {failed_checks}")
            raise DataQualityError("Data validation failed", failed_checks)

        # Apply transformations
        result = df.copy()

        # Filter columns if specified
        if 'columns' in self.config:
            columns = self.config['columns']
            result = result[columns]
            logger.debug(f"Filtered to {len(columns)} columns")

        # Apply aggregation if specified
        if 'aggregation' in self.config:
            agg_func = self.config['aggregation']
            group_by = self.config.get('group_by', [])

            if group_by:
                result = result.groupby(group_by).agg(agg_func).reset_index()
                logger.debug(f"Applied {agg_func} aggregation grouped by {group_by}")

        logger.info(f"Processing complete: resulting DataFrame has {len(result)} rows")
        return result


def load_dataset(file_path: str) -> pd.DataFrame:
    """Load dataset from file.

    Args:
        file_path: Path to the data file

    Returns:
        Loaded DataFrame
    """
    logger.info(f"Loading data from {file_path}")
    return pd.read_csv(file_path)


def main():
    """Main entry point for the data pipeline."""
    try:
        # Configuration
        config = {
            'required_columns': ['user_id', 'transaction_amount'],
            'check_duplicates': True,
            'id_column': 'transaction_id',
            'columns': ['user_id', 'transaction_amount', 'transaction_date'],
            'aggregation': 'sum',
            'group_by': ['user_id']
        }

        # Initialize processor
        processor = DataProcessor(config)

        # Load and process data
        df = load_dataset('transactions.csv')
        result = processor.process(df)

        # Save results
        result.to_csv('processed_transactions.csv', index=False)
        logger.info("Pipeline executed successfully")

    except (DataQualityError, ValueError) as e:
        logger.error(f"Pipeline failed: {e}")
        return 1
    except Exception as e:
        logger.critical(f"Unexpected error: {e}", exc_info=True)
        return 2

    return 0


if __name__ == "__main__":
    exit(main())

Example Test Suite

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Tests for Data Pipeline
======================

Unit tests for the data processing pipeline.
"""
import pytest
import pandas as pd
import numpy as np
from data_pipeline import DataProcessor, DataQualityError


@pytest.fixture
def sample_data():
    """Create sample data for testing."""
    return pd.DataFrame({
        'user_id': [1, 2, 3, 4, 5],
        'transaction_id': [101, 102, 103, 104, 105],
        'transaction_amount': [100.0, 200.0, 150.0, 300.0, 250.0],
        'transaction_date': ['2023-01-01', '2023-01-02', '2023-01-01',
                             '2023-01-03', '2023-01-02']
    })


@pytest.fixture
def config():
    """Create a standard configuration for testing."""
    return {
        'required_columns': ['user_id', 'transaction_amount'],
        'check_duplicates': True,
        'id_column': 'transaction_id',
        'columns': ['user_id', 'transaction_amount', 'transaction_date'],
        'aggregation': 'sum',
        'group_by': ['user_id']
    }


class TestDataProcessor:
    """Tests for the DataProcessor class."""

    def test_processor_initialization(self, config):
        """Test that the processor initializes correctly."""
        processor = DataProcessor(config)
        assert processor.config == config

    def test_valid_data_processing(self, sample_data, config):
        """Test processing with valid data."""
        processor = DataProcessor(config)
        result = processor.process(sample_data)

        # Check that aggregation worked correctly
        assert len(result) == 5  # Each user_id is unique
        assert 'transaction_amount' in result.columns
        assert set(result['user_id']) == {1, 2, 3, 4, 5}

    def test_empty_dataframe(self, config):
        """Test that processing an empty DataFrame raises a ValueError."""
        processor = DataProcessor(config)
        with pytest.raises(ValueError, match="Cannot process empty DataFrame"):
            processor.process(pd.DataFrame())

    def test_missing_values_validation(self, sample_data, config):
        """Test validation for missing values."""
        # Create a copy with NaN values
        df_with_missing = sample_data.copy()
        df_with_missing.loc[0, 'transaction_amount'] = np.nan

        processor = DataProcessor(config)
        with pytest.raises(DataQualityError) as exc_info:
            processor.process(df_with_missing)

        assert "Data validation failed" in str(exc_info.value)
        assert "Missing values in required column: transaction_amount" in exc_info.value.failed_checks

    def test_duplicate_detection(self, sample_data, config):
        """Test detection of duplicate IDs."""
        # Create a copy with duplicate transaction IDs
        df_with_duplicates = sample_data.copy()
        df_with_duplicates.loc[5] = [3, 103, 175.0, '2023-01-04']  # Duplicate transaction_id

        processor = DataProcessor(config)
        with pytest.raises(DataQualityError) as exc_info:
            processor.process(df_with_duplicates)

        assert "Data validation failed" in str(exc_info.value)
        assert "Duplicate values in ID column: transaction_id" in exc_info.value.failed_checks

    def test_column_filtering(self, sample_data, config):
        """Test that only specified columns are included in result."""
        # Modify config to select only two columns
        config_subset = config.copy()
        config_subset['columns'] = ['user_id', 'transaction_amount']
        config_subset.pop('aggregation', None)
        config_subset.pop('group_by', None)

        processor = DataProcessor(config_subset)
        result = processor.process(sample_data)

        assert set(result.columns) == {'user_id', 'transaction_amount'}
        assert 'transaction_date' not in result.columns
        assert len(result) == len(sample_data)

    def test_group_aggregation(self, config):
        """Test grouping and aggregation functionality."""
        # Create data with duplicate user_ids to test aggregation
        df = pd.DataFrame({
            'user_id': [1, 1, 2, 2, 3],
            'transaction_id': [101, 102, 103, 104, 105],
            'transaction_amount': [100.0, 150.0, 200.0, 250.0, 300.0],
            'transaction_date': ['2023-01-01', '2023-01-02', '2023-01-01',
                                '2023-01-03', '2023-01-02']
        })

        processor = DataProcessor(config)
        result = processor.process(df)

        # Check aggregation results
        assert len(result) == 3  # 3 unique user_ids
        assert result.loc[result['user_id'] == 1, 'transaction_amount'].iloc[0] == 250.0  # 100 + 150
        assert result.loc[result['user_id'] == 2, 'transaction_amount'].iloc[0] == 450.0  # 200 + 250
        assert result.loc[result['user_id'] == 3, 'transaction_amount'].iloc[0] == 300.0

Note: These Python standards should be enforced through automated tools in your CI/CD pipeline:

  • Black - code formatting
  • isort - import sorting
  • flake8 - style guide enforcement
  • mypy - static type checking
  • pytest - unit and integration testing
  • pytest-cov - test coverage measurement
  • pre-commit - Git hooks to enforce standards