Python Standards
This document outlines the Python standards and best practices for the Causeway data platform. Following these standards ensures consistency, maintainability, and quality across all Python projects and scripts.
Consistent Python standards are critical for building reliable data engineering solutions. These guidelines will help ensure your Python code is well-structured, maintainable, and follows industry best practices, whether you're developing ETL pipelines, data analysis scripts, or data processing applications.
Code Style
We follow PEP 8 for our Python code style with some additional requirements:
Formatting
- Use 4 spaces for indentation (no tabs)
- Maximum line length of 88 characters (Black formatter default)
- Use Black for automatic code formatting
- Use isort for organizing imports
- End files with a single newline
- Use UTF-8 encoding for Python source files
# Required development tools
pip install black isort pylint mypy
Import Conventions
Organize imports in the following order:
- Standard library imports
- Related third-party imports
- Local application/library specific imports
# Standard library imports
import os
import sys
from datetime import datetime
# Third-party imports
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
# Local application imports
from causeway.utils import logger
from causeway.data import transformations
Naming Conventions
| Type | Convention | Example |
|---|---|---|
| Variables | Snake case | user_count, total_amount |
| Functions | Snake case | calculate_total(), transform_data() |
| Classes | Pascal case (CapWords) | DataProcessor, UserManager |
| Constants | Uppercase with underscores | MAX_CONNECTIONS, DEFAULT_TIMEOUT |
| Modules | Short, snake case | data_processors.py, utils.py |
| Packages | Short, snake case, no underscores | dataprocessors, utils |
| Private Variables/Methods | Prefix with single underscore | _internal_count, _calculate_hash() |
| Protected Variables/Methods | Prefix with double underscore | __sensitive_data, __secure_method() |
Descriptive Naming
- Use descriptive names that clearly indicate the purpose
- Avoid single-letter variable names except for very short blocks (loops, etc.)
- Prioritize clarity over brevity
- Avoid generic names like
data,result,tempwithout additional context
Project Structure
Follow this standard project structure for all Python projects:
project_name/
├── README.md
├── requirements.txt
├── setup.py
├── .gitignore
├── .pre-commit-config.yaml
├── docs/
│ ├── index.md
│ └── api.md
├── tests/
│ ├── __init__.py
│ ├── test_module1.py
│ └── test_module2.py
├── project_name/
│ ├── __init__.py
│ ├── main.py
│ ├── module1.py
│ ├── module2.py
│ └── utils/
│ ├── __init__.py
│ └── helpers.py
└── notebooks/
└── exploratory_analysis.ipynb
Key Structure Elements
- Package name: Should match the project name and be importable
- README.md: Essential project documentation
- requirements.txt: Dependencies with pinned versions
- setup.py: Package installation configuration
- tests/: Test modules that mirror the package structure
- docs/: Comprehensive documentation
- notebooks/: Exploratory or example Jupyter notebooks
Documentation
Docstrings
Use PEP 257 docstrings with Google style format:
def transform_dataframe(df, columns=None, aggregation='sum'):
"""Transform a DataFrame by selecting columns and applying aggregation.
Args:
df (pandas.DataFrame): The input DataFrame to transform.
columns (list, optional): List of column names to include.
If None, all columns are used. Defaults to None.
aggregation (str, optional): Type of aggregation to apply.
Options are 'sum', 'mean', 'max', 'min'. Defaults to 'sum'.
Returns:
pandas.DataFrame: The transformed DataFrame.
Raises:
ValueError: If an invalid aggregation method is provided.
TypeError: If df is not a pandas DataFrame.
Examples:
>>> import pandas as pd
>>> df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]})
>>> transform_dataframe(df, columns=['A'], aggregation='mean')
A
0 2
"""
# Function implementation here
pass
Comments
- Use comments sparingly to explain "why" rather than "what"
- Keep comments up-to-date with code changes
- Complex or non-obvious code should have explanatory comments
- Use TODO comments for planned improvements with ticket references
# TODO(JIRA-1234): Optimize this function for large datasets
def process_large_data(data):
# Using a dictionary for O(1) lookups instead of a list
# because performance testing showed significant improvement
result_map = {}
# Rest of implementation...
Testing
All Python code should have comprehensive tests:
Test Framework
- Use pytest as the testing framework
- Organize tests to mirror the structure of the code being tested
- Name test files with
test_prefix - Name test functions with
test_prefix - Use fixtures for common setup and teardown logic
- Include both unit and integration tests
import pytest
import pandas as pd
from causeway.data import transform_dataframe
@pytest.fixture
def sample_dataframe():
"""Create a sample DataFrame for testing."""
return pd.DataFrame({
'A': [1, 2, 3],
'B': [4, 5, 6]
})
def test_transform_dataframe_with_mean(sample_dataframe):
"""Test that transform_dataframe correctly calculates the mean."""
result = transform_dataframe(sample_dataframe, aggregation='mean')
assert result['A'].iloc[0] == 2
assert result['B'].iloc[0] == 5
def test_transform_dataframe_with_invalid_agg(sample_dataframe):
"""Test that transform_dataframe raises error with invalid aggregation."""
with pytest.raises(ValueError):
transform_dataframe(sample_dataframe, aggregation='invalid')
Coverage Requirements
- Minimum test coverage: 80% for all code
- Critical data processing components: 95% coverage
- Use pytest-cov to measure coverage
- Configure CI/CD to fail if coverage falls below thresholds
Error Handling
Use Specific Exceptions
Raise and catch specific exceptions that clearly communicate what went wrong. Avoid generic exceptions like Exception.
Instead of:
try:
process_data(data)
except Exception as e:
print(f"Error: {e}")
return None
Use:
try:
process_data(data)
except ValueError as e:
logger.error(f"Invalid data format: {e}")
raise
except IOError as e:
logger.error(f"I/O error in processing: {e}")
return None
Define Custom Exceptions
Create custom exception classes for application-specific errors, inheriting from appropriate base exceptions.
class DataProcessingError(Exception):
"""Base class for all data processing errors."""
pass
class InvalidDataFormatError(DataProcessingError):
"""Raised when the data format is invalid."""
pass
class DataQualityError(DataProcessingError):
"""Raised when data quality checks fail."""
def __init__(self, message, failed_checks=None):
super().__init__(message)
self.failed_checks = failed_checks or []
Log Appropriately
Use proper logging with appropriate log levels rather than print statements.
import logging
logger = logging.getLogger(__name__)
def process_data(data):
try:
logger.debug(f"Processing data batch: {len(data)} records")
result = transform(data)
logger.info(f"Successfully processed {len(data)} records")
return result
except ValueError as e:
logger.error(f"Invalid data encountered: {e}", exc_info=True)
raise DataQualityError(f"Failed to process data: {e}")
except Exception as e:
logger.critical(f"Unexpected error in data processing: {e}", exc_info=True)
raise
Common Patterns
Context Managers
Use context managers (with statements) for resource management.
def process_large_file(file_path):
with open(file_path, 'r') as file:
# File automatically closed after this block
for line in file:
process_line(line)
# Custom context manager example
class DatabaseConnection:
def __init__(self, connection_string):
self.connection_string = connection_string
self.connection = None
def __enter__(self):
self.connection = connect_to_db(self.connection_string)
return self.connection
def __exit__(self, exc_type, exc_val, exc_tb):
if self.connection:
self.connection.close()
Generators
Use generators for memory-efficient processing of large datasets.
def read_large_file(file_path, chunk_size=1000):
"""Read a large file line by line, yielding chunks of lines."""
current_chunk = []
with open(file_path, 'r') as file:
for i, line in enumerate(file):
current_chunk.append(line)
if (i + 1) % chunk_size == 0:
yield current_chunk
current_chunk = []
# Yield the last chunk if it's not empty
if current_chunk:
yield current_chunk
Dependency Injection
Use dependency injection to make code more testable and flexible.
class DataProcessor:
def __init__(self, storage_client, transformer=None, logger=None):
"""Initialize with injected dependencies.
Args:
storage_client: Client for storage operations
transformer: Optional transformer for data
logger: Optional logger instance
"""
self.storage_client = storage_client
self.transformer = transformer or DefaultTransformer()
self.logger = logger or logging.getLogger(__name__)
def process(self, data_id):
"""Process data by ID."""
self.logger.info(f"Processing data ID: {data_id}")
raw_data = self.storage_client.get_data(data_id)
processed_data = self.transformer.transform(raw_data)
return processed_data
Anti-Patterns
Global State
Avoid using global variables as they make code harder to test and understand.
Avoid:
# Global variables
CONFIGURATION = {}
DATA_CACHE = {}
def load_config():
global CONFIGURATION
CONFIGURATION = load_from_file()
def process_data(data_id):
global DATA_CACHE
if data_id in DATA_CACHE:
return DATA_CACHE[data_id]
# Process and cache...
Better approach:
class Configuration:
def __init__(self):
self.settings = {}
def load(self):
self.settings = load_from_file()
class DataProcessor:
def __init__(self, config, cache=None):
self.config = config
self.cache = cache or {}
def process_data(self, data_id):
if data_id in self.cache:
return self.cache[data_id]
# Process and cache...
Ignoring Type Hints
Always use type hints for better code readability and static analysis.
Avoid:
def process_data(data, options=None):
if options is None:
options = {}
# Process data...
Better approach:
from typing import Dict, List, Optional, Any
def process_data(
data: List[Dict[str, Any]],
options: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
if options is None:
options = {}
# Process data...
Overusing Classes
Don't use classes when simple functions would suffice.
Avoid:
class Adder:
def __init__(self, x):
self.x = x
def add(self, y):
return self.x + y
# Usage
adder = Adder(5)
result = adder.add(10) # result = 15
Better approach:
def add(x, y):
return x + y
# Usage
result = add(5, 10) # result = 15
Examples
Example Data Pipeline
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Data Pipeline Example
====================
This module demonstrates a well-structured data processing pipeline
following Causeway Python standards.
"""
import logging
from typing import Dict, List, Optional, Union
import pandas as pd
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DataQualityError(Exception):
"""Raised when data quality checks fail."""
def __init__(self, message: str, failed_checks: Optional[List[str]] = None):
super().__init__(message)
self.failed_checks = failed_checks or []
class DataProcessor:
"""Processes data according to provided configuration.
This class encapsulates all data processing logic with configurable
transformations and validation rules.
"""
def __init__(self, config: Dict[str, Union[str, int, float]]):
"""Initialize processor with configuration.
Args:
config: Configuration dictionary containing processing parameters
"""
self.config = config
logger.info(f"Initialized DataProcessor with config: {config}")
def _validate_data(self, df: pd.DataFrame) -> List[str]:
"""Validate the data quality.
Args:
df: DataFrame to validate
Returns:
List of failed validation checks, empty if all passed
"""
failed_checks = []
# Check for missing values in required columns
required_columns = self.config.get('required_columns', [])
for col in required_columns:
if col in df.columns and df[col].isna().any():
failed_checks.append(f"Missing values in required column: {col}")
# Check for duplicates if required
if self.config.get('check_duplicates', False):
id_column = self.config.get('id_column', 'id')
if id_column in df.columns and df[id_column].duplicated().any():
failed_checks.append(f"Duplicate values in ID column: {id_column}")
logger.debug(f"Validation complete: {len(failed_checks)} checks failed")
return failed_checks
def process(self, df: pd.DataFrame) -> pd.DataFrame:
"""Process the input DataFrame.
Args:
df: Input DataFrame to process
Returns:
Processed DataFrame
Raises:
DataQualityError: If data validation fails
ValueError: If the input DataFrame is empty
"""
if df.empty:
raise ValueError("Cannot process empty DataFrame")
logger.info(f"Processing DataFrame with {len(df)} rows and {len(df.columns)} columns")
# Validate data
failed_checks = self._validate_data(df)
if failed_checks:
logger.error(f"Data validation failed: {failed_checks}")
raise DataQualityError("Data validation failed", failed_checks)
# Apply transformations
result = df.copy()
# Filter columns if specified
if 'columns' in self.config:
columns = self.config['columns']
result = result[columns]
logger.debug(f"Filtered to {len(columns)} columns")
# Apply aggregation if specified
if 'aggregation' in self.config:
agg_func = self.config['aggregation']
group_by = self.config.get('group_by', [])
if group_by:
result = result.groupby(group_by).agg(agg_func).reset_index()
logger.debug(f"Applied {agg_func} aggregation grouped by {group_by}")
logger.info(f"Processing complete: resulting DataFrame has {len(result)} rows")
return result
def load_dataset(file_path: str) -> pd.DataFrame:
"""Load dataset from file.
Args:
file_path: Path to the data file
Returns:
Loaded DataFrame
"""
logger.info(f"Loading data from {file_path}")
return pd.read_csv(file_path)
def main():
"""Main entry point for the data pipeline."""
try:
# Configuration
config = {
'required_columns': ['user_id', 'transaction_amount'],
'check_duplicates': True,
'id_column': 'transaction_id',
'columns': ['user_id', 'transaction_amount', 'transaction_date'],
'aggregation': 'sum',
'group_by': ['user_id']
}
# Initialize processor
processor = DataProcessor(config)
# Load and process data
df = load_dataset('transactions.csv')
result = processor.process(df)
# Save results
result.to_csv('processed_transactions.csv', index=False)
logger.info("Pipeline executed successfully")
except (DataQualityError, ValueError) as e:
logger.error(f"Pipeline failed: {e}")
return 1
except Exception as e:
logger.critical(f"Unexpected error: {e}", exc_info=True)
return 2
return 0
if __name__ == "__main__":
exit(main())
Example Test Suite
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Tests for Data Pipeline
======================
Unit tests for the data processing pipeline.
"""
import pytest
import pandas as pd
import numpy as np
from data_pipeline import DataProcessor, DataQualityError
@pytest.fixture
def sample_data():
"""Create sample data for testing."""
return pd.DataFrame({
'user_id': [1, 2, 3, 4, 5],
'transaction_id': [101, 102, 103, 104, 105],
'transaction_amount': [100.0, 200.0, 150.0, 300.0, 250.0],
'transaction_date': ['2023-01-01', '2023-01-02', '2023-01-01',
'2023-01-03', '2023-01-02']
})
@pytest.fixture
def config():
"""Create a standard configuration for testing."""
return {
'required_columns': ['user_id', 'transaction_amount'],
'check_duplicates': True,
'id_column': 'transaction_id',
'columns': ['user_id', 'transaction_amount', 'transaction_date'],
'aggregation': 'sum',
'group_by': ['user_id']
}
class TestDataProcessor:
"""Tests for the DataProcessor class."""
def test_processor_initialization(self, config):
"""Test that the processor initializes correctly."""
processor = DataProcessor(config)
assert processor.config == config
def test_valid_data_processing(self, sample_data, config):
"""Test processing with valid data."""
processor = DataProcessor(config)
result = processor.process(sample_data)
# Check that aggregation worked correctly
assert len(result) == 5 # Each user_id is unique
assert 'transaction_amount' in result.columns
assert set(result['user_id']) == {1, 2, 3, 4, 5}
def test_empty_dataframe(self, config):
"""Test that processing an empty DataFrame raises a ValueError."""
processor = DataProcessor(config)
with pytest.raises(ValueError, match="Cannot process empty DataFrame"):
processor.process(pd.DataFrame())
def test_missing_values_validation(self, sample_data, config):
"""Test validation for missing values."""
# Create a copy with NaN values
df_with_missing = sample_data.copy()
df_with_missing.loc[0, 'transaction_amount'] = np.nan
processor = DataProcessor(config)
with pytest.raises(DataQualityError) as exc_info:
processor.process(df_with_missing)
assert "Data validation failed" in str(exc_info.value)
assert "Missing values in required column: transaction_amount" in exc_info.value.failed_checks
def test_duplicate_detection(self, sample_data, config):
"""Test detection of duplicate IDs."""
# Create a copy with duplicate transaction IDs
df_with_duplicates = sample_data.copy()
df_with_duplicates.loc[5] = [3, 103, 175.0, '2023-01-04'] # Duplicate transaction_id
processor = DataProcessor(config)
with pytest.raises(DataQualityError) as exc_info:
processor.process(df_with_duplicates)
assert "Data validation failed" in str(exc_info.value)
assert "Duplicate values in ID column: transaction_id" in exc_info.value.failed_checks
def test_column_filtering(self, sample_data, config):
"""Test that only specified columns are included in result."""
# Modify config to select only two columns
config_subset = config.copy()
config_subset['columns'] = ['user_id', 'transaction_amount']
config_subset.pop('aggregation', None)
config_subset.pop('group_by', None)
processor = DataProcessor(config_subset)
result = processor.process(sample_data)
assert set(result.columns) == {'user_id', 'transaction_amount'}
assert 'transaction_date' not in result.columns
assert len(result) == len(sample_data)
def test_group_aggregation(self, config):
"""Test grouping and aggregation functionality."""
# Create data with duplicate user_ids to test aggregation
df = pd.DataFrame({
'user_id': [1, 1, 2, 2, 3],
'transaction_id': [101, 102, 103, 104, 105],
'transaction_amount': [100.0, 150.0, 200.0, 250.0, 300.0],
'transaction_date': ['2023-01-01', '2023-01-02', '2023-01-01',
'2023-01-03', '2023-01-02']
})
processor = DataProcessor(config)
result = processor.process(df)
# Check aggregation results
assert len(result) == 3 # 3 unique user_ids
assert result.loc[result['user_id'] == 1, 'transaction_amount'].iloc[0] == 250.0 # 100 + 150
assert result.loc[result['user_id'] == 2, 'transaction_amount'].iloc[0] == 450.0 # 200 + 250
assert result.loc[result['user_id'] == 3, 'transaction_amount'].iloc[0] == 300.0
Note: These Python standards should be enforced through automated tools in your CI/CD pipeline:
- Black - code formatting
- isort - import sorting
- flake8 - style guide enforcement
- mypy - static type checking
- pytest - unit and integration testing
- pytest-cov - test coverage measurement
- pre-commit - Git hooks to enforce standards