At MRH Trowe, we've integrated Matillion with Snowflake and AWS to streamline our ELT and ETL data integrations (Extract, Load, Transform). Matillion offers a low-code/no-code solution for creating these integrations, providing both SQL-related components (Transformation) for transformation pipelines and orchestration pipelines for data integration and automation (Extract and Load).
While Matillion comes with many pre-built components to connect with various sources, there are situations where Python components are necessary for more complex tasks or to implement feature which are not supported. This combination allows us to load data from different sources into Snowflake, leveraging its robust data platform capabilities.
Given Matillion's focus on low-code integration, I often wondered how to streamline Python development for use within Matillion. Considering that Matillion operates on Docker in AWS (e.g., with Fargate), I decided it was best to take ownership of the container by creating a custom Matillion Helper Package for Python to ease integration. For this, I utilized Amazon Q Developer.
Amazon Q can be leveraged with VS Code and it's server version using a dedicated free extension. You can use your Builders profile or leverage SSO (Amazon Identity Center) to streamline IAM-related tasks for Amazon Q Developer. After installing this extension, you're provided with a chat interface that facilitates an agentic development lifecycle.
Creating a Custom PyPi Package with Amazon Q
I set out to create my first Python package as a companion to the Matillion Dockerfile with the following structure:
matillion-helper/
├── matillion_helper/
│ ├── __init__.py
│ └── helper.py
├── setup.py
├── pyproject.toml
├── README.md
├── Dockerfile
└── .dockerignore
To generate this structure, I prompted Amazon Q: “Create the needed minimal setup for PyPi package to be installed into a Docker container. Name the package matillion_helper.” With Claude Sonnet 3.7. this took only a matter of seconds.
Use Case: Consolidated Logging
Once the package was set up, I focused on creating a module for consolidated logging. This module would log messages from any function up to the main execution, making logging consistent across the package and all Matillion projects. The prompt "Create a logging_config.py file for my matillion_helper package with the following features:
- Functions to configure logging with console and file output
- Support for log rotation with configurable max size and backup count
- Environment variable support for log directory and log level
- A get_logger convenience function
- Proper error handling for file permissions
- Type hints for all functions"
As a result I got this code.
import logging
import logging.handlers
import os
import sys
from pathlib import Path
from typing import Any, Optional
def configure_logging(
logger_name: Optional[str] = None,
log_level: int = logging.INFO,
log_file: Optional[str] = None,
log_format: str = "[%(asctime)s] [%(levelname)s] [%(name)s] - %(message)s",
date_format: str = "%Y-%m-%d %H:%M:%S",
console_output: bool = True,
max_bytes: int = 10485760, # 10MB
backup_count: int = 5,
propagate: bool = False,
) -> logging.Logger:
"""Configure logging for a module.
Args:
logger_name: Name of the logger (defaults to the calling module name)
log_level: Logging level (default: INFO)
log_file: Log file name (default: based on logger_name)
log_format: Format string for log messages
date_format: Format string for timestamps
console_output: Whether to output logs to console
file_output: Whether to output logs to file
max_bytes: Maximum size of log file before rotation
backup_count: Number of backup log files to keep
propagate: Whether to propagate logs to parent loggers
Returns:
logging.Logger: Configured logger instance
"""
# If no logger name is provided, use the calling module's name
if logger_name is None:
import inspect
frame = inspect.stack()[1]
module = inspect.getmodule(frame[0])
logger_name = module.__name__ if module else "matillion_helper"
# Get or create the logger
logger = logging.getLogger(logger_name)
# Clear any existing handlers to avoid duplicate logs
if logger.hasHandlers():
logger.handlers.clear()
# Set the log level
logger.setLevel(log_level)
logger.propagate = propagate
# Create formatter
formatter = logging.Formatter(log_format, date_format)
# Add console handler if requested
if console_output:
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
console_handler.setLevel(log_level)
logger.addHandler(console_handler)
return logger
def get_logger(
name: Optional[str] = None,
log_level: Optional[int] = None,
**kwargs: Any,
) -> logging.Logger:
"""Get a configured logger.
This is a convenience function for getting a logger with the default configuration.
Args:
name: Logger name (defaults to the calling module name)
log_level: Override default log level
**kwargs: Additional arguments to pass to configure_logging
Returns:
logging.Logger: Configured logger instance
"""
# Get environment variable for log level, defaulting to INFO
env_log_level = os.environ.get("MATILLION_LOG_LEVEL", "INFO")
# Map string log levels to logging constants
log_level_map = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}
# Use provided log_level, or get from environment, or default to INFO
effective_log_level = log_level or log_level_map.get(env_log_level, logging.INFO)
return configure_logging(logger_name=name, log_level=effective_log_level, **kwargs)
This implementation ensured that the logging mechanism was centralized and not dependent on individual developers' preferences.
Use Case: Interaction with AWS API
Another regular task we conduct with Matillion involves IAM context switches. This setup allows us to eliminate static credentials. To achieve this, we implemented the ability to make STS (Security Token Service) AssumeRole calls. The Matillion instance needs permissions from both the source and target AWS accounts to facilitate cross-account access.
Using a prompt like “Create a function to make STS AssumeRole possible in a generic way,” I developed a module that utilizes the existing logging_config.py for consistent logging throughout the implementation.
import boto3
from botocore.exceptions import ClientError, ParamValidationError
from typing import Dict, Optional, Any
from matillion_helper.logging_config import get_logger
# Get configured logger for this module
logger = get_logger(__name__)
def assume_role(
role_arn: str,
session_name: str = "AssumedRoleSession",
duration_seconds: int = 3600,
external_id: Optional[str] = None,
region_name: Optional[str] = None,
sts_client: Optional[Any] = None,
) -> Dict[str, Any]:
"""Assume an AWS IAM role and return temporary credentials.
This function allows for assuming an IAM role and obtaining temporary credentials
that can be used to make AWS API calls with the permissions of the assumed role.
Args:
role_arn (str): The Amazon Resource Name (ARN) of the role to assume.
session_name (str, optional): An identifier for the assumed role session.
Defaults to "AssumedRoleSession".
duration_seconds (int, optional): The duration, in seconds, of the role session.
Defaults to 3600 (1 hour).
external_id (str, optional): A unique identifier that might be required when
assuming a role in another account. Defaults to None.
region_name (str, optional): The AWS region to connect to. Defaults to None,
which uses the default region from the AWS configuration.
sts_client (boto3.client, optional): An existing STS client to use.
If not provided, a new client will be created.
Returns:
Dict[str, Any]: A dictionary containing the temporary credentials and session information.
The dictionary includes:
- AccessKeyId: The access key ID for the temporary credentials
- SecretAccessKey: The secret access key for the temporary credentials
- SessionToken: The session token for the temporary credentials
- Expiration: When the temporary credentials expire
Raises:
ClientError: If AWS returns an error during role assumption.
ParamValidationError: If the parameters provided are invalid.
Exception: Any other exception that occurs during role assumption.
"""
try:
# Create STS client if not provided
if sts_client is None:
sts_client = boto3.client("sts", region_name=region_name)
# Prepare assume role parameters
assume_role_params = {
"RoleArn": role_arn,
"RoleSessionName": session_name,
"DurationSeconds": duration_seconds,
}
# Add external ID if provided
if external_id:
assume_role_params["ExternalId"] = external_id
# Assume the role
response = sts_client.assume_role(**assume_role_params)
# Extract credentials from the response
credentials = response["Credentials"]
logger.info(f"Successfully assumed role: {role_arn}")
return credentials
except ClientError as e:
logger.exception(f"Failed to assume role due to AWS error: {e}")
raise
except ParamValidationError as e:
logger.exception(f"Invalid parameters for assume_role: {e}")
raise
except Exception as e:
logger.exception(f"Unexpected error assuming role: {e}")
raise
Use Case: Refreshing Power BI Semantic Models
Another useful feature we developed is the ability to kick off dataset refreshes for Power BI from Matillion. This change transforms a scheduled dependency between Matillion and Power BI semantic models into an event-based approach, resulting in more consistent behavior and fewer error sources. However, configuration in Power BI Admin Settings is required to allow API interaction, and there are restrictions on refreshes—limited to eight per day. If you meet the prerequisites, you leverage event based refreshes for all power bi workspaces and semantic models, by inniting the service principal and assigning contributor rights.
Using a prompt like “Create a Python function to update Power BI datasets in a workspace.”, I implemented a function that interacts with the Power BI API to refresh datasets, ensuring that we can maintain updated reports directly from our data integration workflows.
import requests
import json
import msal
from matillion_helper.logging_config import get_logger
# Get configured logger for this module
logger = get_logger(__name__)
def update_powerbi_datasets(workspace_id, client_id, client_secret, tenant_id):
"""
Update Power BI datasets in a specified workspace.
Args:
workspace_id (str): The ID of the Power BI workspace
client_id (str): Azure AD application client ID
client_secret (str): Azure AD application client secret
tenant_id (str): Azure AD tenant ID
Returns:
dict: Result of the refresh operation
"""
# Get access token using MSAL
authority = f"https://login.microsoftonline.com/{tenant_id}"
app = msal.ConfidentialClientApplication(
client_id=client_id,
client_credential=client_secret,
authority=authority
)
# Acquire token for Power BI service
scopes = ["https://analysis.windows.net/powerbi/api/.default"]
result = app.acquire_token_for_client(scopes=scopes)
if "access_token" not in result:
logger.exception(f"Error getting token: {result.get('error')}")
logger.exception (f"Error description: {result.get('error_description')}")
return None
access_token = result["access_token"]
# Get datasets in the workspace
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
datasets_url = f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}/datasets"
datasets_response = requests.get(datasets_url, headers=headers)
if datasets_response.status_code != 200:
logger.exception (f"Error getting datasets: {datasets_response.text}")
return None
datasets = datasets_response.json()["value"]
refresh_results = {}
# Refresh each dataset
for dataset in datasets:
dataset_id = dataset["id"]
dataset_name = dataset["name"]
refresh_url = f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}/datasets/{dataset_id}/refreshes"
refresh_response = requests.post(refresh_url, headers=headers)
if refresh_response.status_code == 202:
refresh_results[dataset_name] = "Refresh triggered successfully"
else:
refresh_results[dataset_name] = f"Error: {refresh_response.text}"
return refresh_results
# Example usage
if __name__ == "__main__":
# Replace these with your actual value
client_id="client_id"
client_secret="client_secret"
tenant_id="tenant_id"
workspace_id="workspace_id"
results = update_powerbi_datasets(workspace_id, client_id, client_secret, tenant_id)
logger.info(results)
Summary
By utilizing Amazon Q and creating a custom Matillion Python package, we gained much-needed flexibility in our development process. While this approach requires initial setup and containerization efforts, the benefits include a more integrated CI/CD process and the ability to manage Python scripts effectively within Matillion.
As of today, the current Copilot from Matillion (MAIA) is not helping you to create and maintain Python scripts. But by introducing Amazon Q Developer you can overcome this hurdle of hight code implementation by procideng meanigful buildings blocks and your data enginneers to leverage pure but predefined Python code.
Happy (vibed) Conding! :-)
Top comments (0)