Introduction
To kick off this intriguing and challenging journey, we'll start by building the AI service, the final component of our system’s architecture. We'll exploit the power of asynchronous programming in Python via the asyncio
and one of its most lightweight yet powerful asynchronous HTTP Client/Servers, aiohttp
. It is a "double-edged" sword that can be your http/websocket client and at the same time, http/websocket server. Unlike requests
, which is synchronous, aiohttp
supports both HTTP and WebSocket communication, making it a worthy alternative to FastAPI
, Flask
, or even Django
for async-based applications.
Let's put aiohttp to work in this article! 🚀
Prerequisite
Before diving in, I assume you've already:
- Created a project (e.g.,
utility
) - Set up a virtual environment
- Installed the required dependencies
If you haven't, copy the following into requirements.txt
:
aiohttp==3.11
pandas==2.2
pdf2image==1.17
pytesseract==0.3
transformers==4.48
python-dotenv==1.0
scikit-learn==1.6
torch @ https://download.pytorch.org/whl/cpu/torch-2.5.1%2Bcpu-cp312-cp312-linux_x86_64.whl
torchvision @ https://download.pytorch.org/whl/cpu/torchvision-0.20.1%2Bcpu-cp312-cp312-linux_x86_64.whl
and, while in your project's folder's virtual environment, run:
(virtualenv)$ pip install -r requirements.txt
Notice the versions of torch
and torchvision
used here. I used CPU-only versions of torch
and torchvision
since I deployed this on a CPU-based server. If you have a GPU-enabled server or development environment, you can install the standard versions without the URL specifier (@ https://download.pytorch.org/whl/...
) which, by the way, is a nifty way to include libraries' URL in requirements.txt
.
tourch
(v2.6).
Just as I write this, The PyTorch Foundation announced the release of PyTorch® 2.6 which debuts it for Python 3.13 with some performance improvements. Consider using it instead.
Source code
An AI-powered financial behavior analyzer and advisor written in Python (aiohttp) and TypeScript (ExpressJS & SvelteKit with Svelte 5)
Implementation
Let's now go into the meat of this article.
Step 1: Spawning an `aiohttp` Server
Create app.py
and populate it with the following:
import os
from asyncio import Lock
from aiohttp import WSMsgType, web
from aiohttp.web import Request, Response, WebSocketResponse
from utils.analyzer import analyze_transactions
from utils.extract_text import extract_text_from_pdf
from utils.settings import base_settings
from utils.summarize import summarize_transactions
from utils.websocket import WebSocketManager
# Replace global ws_connections with typed version
ws_connections: set[WebSocketResponse] = set()
ws_lock = Lock()
async def start_background_tasks(app):
"""Initialize application background tasks."""
app['ws_connections'] = ws_connections
app['ws_lock'] = ws_lock
async def cleanup_background_tasks(app):
"""Cleanup application resources."""
await cleanup_ws(app)
async def cleanup_ws(app):
"""Cleanup WebSocket connections on shutdown."""
async with ws_lock:
connections = set(ws_connections) # Create a copy to iterate safely
for ws in connections:
await ws.close(code=WSMsgType.CLOSE, message='Server shutdown')
ws_connections.clear()
async def extract_text(request: Request) -> Response:
try:
base_settings.logger.info('Received text extraction request')
reader = await request.multipart()
field = await reader.next()
if field.name != 'file':
base_settings.logger.warning('No file field in request')
return web.json_response({'error': 'No file uploaded'}, status=400)
# Read the file content
base_settings.logger.info('Reading uploaded file')
file_content: bytes = await field.read()
text: str = await extract_text_from_pdf(file_content)
base_settings.logger.info('Successfully processed request')
return web.json_response({'text': text})
except Exception as e:
base_settings.logger.error(
f'Request processing failed: {str(e)}', exc_info=True
)
return web.json_response({'error': str(e)}, status=500)
async def websocket_handler(request: Request) -> WebSocketResponse:
"""WebSocket handler for real-time communication."""
ws = web.WebSocketResponse()
await ws.prepare(request)
async with ws_lock:
ws_connections.add(ws)
ws_manager = WebSocketManager(ws)
await ws_manager.prepare()
base_settings.logger.info('WebSocket connection established')
try:
async for msg in ws:
if msg.type == WSMsgType.TEXT:
try:
data = msg.json()
if data.get('action') == 'analyze':
result = await analyze_transactions(
data.get('transactions'), ws_manager
)
await ws_manager.send_progress(
'Analysis complete', 1.0, 'Analysis'
)
await ws_manager.send_result(
result, 'Analysis', 'analysis_complete'
)
elif data.get('action') == 'summary':
result = await summarize_transactions(
data.get('transactions'), ws_manager
)
await ws_manager.send_progress(
'Summary complete', 1.0, 'Summarize'
)
await ws_manager.send_result(
result, 'Summarize', 'summary_complete'
)
else:
await ws_manager.send_result(
{'message': 'Unknown action'}, 'Error', 'error'
)
except Exception as e:
base_settings.logger.error(f'Message processing error: {str(e)}')
await ws_manager.send_result({'error': str(e)}, 'Error', 'error')
elif msg.type == WSMsgType.ERROR:
base_settings.logger.error(f'WebSocket error: {ws.exception()}')
finally:
async with ws_lock:
ws_connections.remove(ws)
await ws.close()
base_settings.logger.info('WebSocket connection closed')
return ws
def init_app() -> web.Application:
app = web.Application()
app.router.add_post('/extract-text', extract_text)
app.router.add_get('/ws', websocket_handler)
# Add startup/cleanup handlers
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
return app
if __name__ == '__main__':
app = init_app()
try:
web.run_app(
app,
host='0.0.0.0',
port=int(os.environ.get('PORT', 5173)),
)
except KeyboardInterrupt:
base_settings.logger.info('Received keyboard interrupt...')
except Exception as e:
base_settings.logger.error(f'Server error: {e}')
finally:
base_settings.logger.info('Server shutdown complete.')
It was a long line of code. However, looking closely, it's very straightforward. We started out registering all our WebSocket connections in a set (to prevent duplicates). We could have done something like as suggested:
...
import weakref
...
ws_connections = web.AppKey('ws_connections', weakref.WeakSet)
...
but I went for the low-level approach and controlled the startup, and async locks to ensure thread safety, cleaning up, and stuff like that. You can use the suggested approach instead. Then comes the extract_text
route handler. It's the handler that receives the user's transactions PDF file, forces that file
is its name (this is my preference, you can allow any file name), and then let extract_text_from_pdf
do its magic in extracting texts from the file and response with the extracted text. Let's see how extract_text_from_pdf
can do this.
Step 2: Utility package - `extract_text.py`
import os
import tempfile
import pytesseract
from pdf2image import convert_from_path
from .settings import base_settings as settings
async def extract_text_from_pdf(pdf_file: bytes) -> str:
settings.logger.info('Starting PDF text extraction')
# Create temporary file to store uploaded PDF
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_pdf:
tmp_pdf.write(pdf_file)
tmp_path: str = tmp_pdf.name
settings.logger.debug(f'Created temporary file: {tmp_path}')
try:
# Convert to image using resolution 600 dpi
settings.logger.info('Converting PDF to images')
pages = convert_from_path(tmp_path)
settings.logger.debug(f'Converted {len(pages)} pages')
# Extract text
text_data: str = ''
for i, page in enumerate(pages, 1):
settings.logger.debug(f'Processing page {i}')
text: str = pytesseract.image_to_string(page)
text_data += text + '\n'
settings.logger.info('Text extraction completed successfully')
return text_data
except Exception as e:
settings.logger.error(f'Error during text extraction: {str(e)}', exc_info=True)
raise
finally:
# Clean up temporary file
settings.logger.debug(f'Cleaning up temporary file: {tmp_path}')
os.unlink(tmp_path)
It uses Google OCR via the pytesseract
library to extract texts from the file. Although, OCR should be able to get texts directly from any document but it worked better for me with images via image_to_string
hence the intermediate step of converting the PDF file to images before feeding it into OCR. The temporary file creation was because convert_from_path
requires a path that could only be gotten from a temporary file at this point since we don't want to save anyone's file on our end. A better option is to use the convert_from_bytes
from pdf2image
which doesn't require temporary file creation.
Back to app.py
, we also have a WebSocket handler that simply delegates actions to various submodules to handle. But before then, I have a utility that helps manage all the app's WebSocket connections. This is to elegantly manage connections without worrying about leaks and stuff like that.
Step 3: Utility package - `websocket.py`
from aiohttp import web
from utils.settings import base_settings
class WebSocketManager:
def __init__(self, ws: web.WebSocketResponse):
self.ws = ws
self._ready = False
base_settings.logger.info(f'Initializing WebSocket manager: {ws}')
async def prepare(self):
self._ready = True
base_settings.logger.info('WebSocket manager ready')
async def send_progress(self, message: str, progress: float, task_type: str = None):
if not self._ready or self.ws.closed:
base_settings.logger.warning(
'Cannot send progress - WebSocket not ready/closed'
)
return
try:
await self.ws.send_json(
{
'action': 'progress',
'message': message,
'progress': progress,
'taskType': task_type,
}
)
except Exception as e:
base_settings.logger.error(f'Error sending progress: {str(e)}')
async def send_result(self, result: dict, task_type: str, action: str):
if not self._ready or self.ws.closed:
base_settings.logger.warning(
'Cannot send result - WebSocket not ready/closed'
)
return
try:
await self.ws.send_json(
{
'action': action,
'result': result,
'taskType': task_type,
}
)
except Exception as e:
base_settings.logger.error(f'Error sending result: {str(e)}')
The app would well work without it but I encountered a bug where connections were not ready but requests were already being made by the frontend. So I figured having this utility would help to ensure the readiness of these connections before accepting requests. I also implemented methods, send_result
and send_progress
, to utilize aiohttp's WebSocket's send_json
to send analysis results and analysis progress reports respectively back to the requester(s) so that they won't be kept in the dark if the analysis is taking long (which will on a CPU-only machine using many transformer models). After the custom WebSocket manager completes preparation, we loop over the messages received and appropriately delegate actions based on the action
key. For now, we support analyze
and summary
actions via the analyze_transactions
and summarize_transactions
respectively. Let's see what they do.
Step 4: Utility package - `analyzer.py`
import asyncio
import os
from datetime import datetime
import numpy as np
import pandas as pd
import torch
from sklearn.ensemble import IsolationForest
from transformers import pipeline
from models.base import Transaction
from utils.settings import base_settings as settings
from utils.websocket import WebSocketManager
def get_device() -> tuple[torch.device, str]:
"""
Detect the best available device (GPU, MPS, or CPU) for PyTorch computations.
"""
if torch.cuda.is_available():
# Check if CUDA (NVIDIA GPU) is available
return torch.device('cuda'), 'CUDA (NVIDIA GPU)'
elif torch.backends.mps.is_available():
# Check if MPS (Metal Performance Shaders on Apple Silicon) is available
return torch.device('mps'), 'MPS (Apple Metal)'
else:
# Default to CPU
return torch.device('cpu'), 'CPU'
async def analyze_transactions(
transactions: list[dict], ws_manager: WebSocketManager = None
) -> dict:
"""Analyze transactions and return insights with progress updates."""
try:
# Step 1: Validate and preprocess transactions
if ws_manager:
await ws_manager.send_progress(
'Validating transactions...', 0.1, 'Analysis'
)
if not transactions:
if ws_manager:
await ws_manager.send_progress(
'No transactions provided', 1.0, 'Summarize'
)
return {'error': 'No transactions provided'}
tx_objects = [
Transaction(
_id=t['_id'],
balance=float(t['balance']),
type=t['type'],
date=datetime.fromisoformat(t['date']),
description=t['description'],
amount=float(t['amount']),
userId=t['userId'],
createdAt=datetime.fromisoformat(t['createdAt']),
updatedAt=datetime.fromisoformat(t['updatedAt']),
)
for t in transactions
if validate_transaction(t)
]
if not tx_objects:
if ws_manager:
await ws_manager.send_progress(
'No valid transactions provided', 1.0, 'Analysis'
)
return {'error': 'No valid transactions provided'}
# Step 2: Classification
if ws_manager:
await ws_manager.send_progress(
'Classifying transactions...', 0.2, 'Analysis'
)
categories = await classify_transactions(tx_objects)
# Step 3: Anomaly Detection
if ws_manager:
await ws_manager.send_progress('Detecting anomalies...', 0.4, 'Analysis')
anomalies = await detect_anomalies(tx_objects)
# Step 4: Spending Analysis
if ws_manager:
await ws_manager.send_progress('Analyzing spending...', 0.6, 'Analysis')
spending_analysis = await analyze_spending(tx_objects)
# Step 5: Trend Prediction
if ws_manager:
await ws_manager.send_progress(
'Predicting spending trends...', 0.8, 'Analysis'
)
spending_trends = await predict_trends(tx_objects)
# Compile the results
result = {
'categories': categories,
'anomalies': anomalies,
'spending_analysis': spending_analysis,
'spending_trends': spending_trends,
}
settings.logger.info('Transaction analysis completed successfully')
return result
except Exception as e:
settings.logger.error(f'Error analyzing transactions: {str(e)}', exc_info=True)
if ws_manager:
await ws_manager.send_progress('Analysis failed', 1.0)
return {'error': f'Analysis failed: {str(e)}'}
def validate_transaction(t: dict) -> bool:
"""Validate transaction fields."""
try:
required_fields = {'date', 'description', 'amount', 'balance', 'type', 'userId'}
if not all(field in t for field in required_fields):
return False
# Validate amount and balance are numeric
float(t['amount'])
float(t['balance'])
# Validate date formats
datetime.fromisoformat(t['date'])
return True
except (ValueError, TypeError):
return False
async def classify_transactions(transactions: list[Transaction]) -> dict:
"""
Classify transactions using FinBERT.
"""
# Get device (GPU if available, otherwise CPU)
device, device_name = get_device()
settings.logger.info(f'Using device for classification: {device_name}')
# Load FinBERT classification pipeline
classifier = pipeline(
'zero-shot-classification',
model='yiyanghkust/finbert-tone',
device=0 if device.type in ['cuda', 'mps'] else -1, # Use GPU if available
)
# Define financial categories
labels = os.getenv(
'LABELS', 'groceries,housing,transportation,entertainment,utilities,other'
).split(',')
# Prepare transaction descriptions for classification
descriptions = [tx.description.lower() for tx in transactions]
# Batch classify descriptions (Improves performance)
results = await asyncio.to_thread(
classifier, descriptions, labels, truncation=True, max_length=128
)
# Initialize categories and percentages
categories = {label: 0 for label in labels}
# Aggregate classification results
for tx, result in zip(transactions, results):
category = result['labels'][0]
categories[category] += abs(tx.amount)
total_spent = sum(categories.values())
# Calculate percentages
percentages = {
category: (amount / total_spent) * 100 if total_spent > 0 else 0
for category, amount in categories.items()
}
return {'categories': categories, 'percentages': percentages}
async def detect_anomalies(transactions: list[Transaction]) -> list[dict]:
"""Detect anomalies in transactions and provide specific reasons."""
# Extract transaction amounts and reshape for Isolation Forest
amounts = np.array([tx.amount for tx in transactions]).reshape(-1, 1)
model = IsolationForest(contamination=0.05, random_state=42)
anomalies = model.fit_predict(amounts)
# Calculate mean and standard deviation for dynamic reason generation
mean = np.mean(amounts)
std = np.std(amounts)
# Detect anomalies and generate specific reasons
anomaly_details = []
for tx, anomaly in zip(transactions, anomalies):
if anomaly == -1: # Anomaly detected
# Calculate the Z-score for the transaction
z_score = (tx.amount - mean) / std
# Generate a dynamic reason
if tx.amount > 0:
reason = (
f'Unusually high income of {tx.amount} detected '
f'(Z-score: {z_score:.2f}).'
)
elif tx.amount < 0 and abs(tx.amount) > abs(mean) + 2 * std:
reason = (
f'Unusually large expense of {tx.amount} detected '
f'(Z-score: {z_score:.2f}).'
)
elif tx.amount < 0 and 'luxury' in tx.description.lower():
reason = 'Uncommon luxury expense detected.'
elif tx.amount < 0 and 'groceries' in tx.description.lower():
reason = 'Unusually high grocery expense detected.'
else:
reason = f'Outlier transaction with amount {tx.amount} (Z-score: {z_score:.2f}).'
# Append the anomaly details
anomaly_details.append(
{
'date': tx.date.isoformat(),
'description': tx.description,
'amount': tx.amount,
'reason': reason,
}
)
return anomaly_details
async def analyze_spending(transactions: list[Transaction]) -> dict:
"""Generate spending analysis with cumulative balance"""
total_spent = sum(tx.amount for tx in transactions if tx.amount < 0)
total_income = sum(tx.amount for tx in transactions if tx.amount > 0)
# Create a DataFrame from transactions
df = pd.DataFrame([t.__dict__ for t in transactions])
# Ensure date parsing is correct
df['date'] = pd.to_datetime(df['date'])
df['date'] = df['date'].dt.tz_localize(None)
# Group by the date and calculate daily totals
daily_summary = df.groupby(df['date'].dt.date)['amount'].sum()
# Sort by date to ensure cumulative calculations are correct
df = df.sort_values(by='date')
# Calculate the cumulative balance
df['cumulative_balance'] = df['balance']
# Convert daily_summary to JSON-serializable format
daily_summary = {str(date): float(amount) for date, amount in daily_summary.items()}
# Prepare cumulative balance as JSON-serializable format
cumulative_balance = {
row['date'].strftime('%Y-%m-%d'): row['cumulative_balance']
for _, row in df.iterrows()
}
return {
'total_spent': abs(total_spent),
'total_income': total_income,
'savings_rate': (
((total_income + total_spent) / total_income) * 100 if total_income else 0
),
'daily_summary': daily_summary,
'cumulative_balance': cumulative_balance,
}
async def predict_trends(transactions: list[Transaction]) -> dict:
"""Predict future spending trends with enhanced analysis."""
if len(transactions) < 2:
return {'trend': 'Not enough data'}
# Convert dates to numeric for regression
dates = [(tx.date - transactions[0].date).days for tx in transactions]
amounts = [tx.amount for tx in transactions]
# Linear regression for trends
coeffs = np.polyfit(dates, amounts, 1)
trend = 'increasing' if coeffs[0] > 0 else 'decreasing'
# Include confidence interval (optional)
slope, intercept = coeffs
trend_line = [slope * x + intercept for x in dates]
# Estimated monthly spend
df = pd.DataFrame([t.__dict__ for t in transactions])
df['date'] = pd.to_datetime(df['date'])
df['date'] = df['date'].dt.tz_localize(None)
months = len(df['date'].dt.to_period('M').unique())
return {
'trend': trend,
'trend_slope': slope,
'estimated_monthly_spend': abs(
sum(tx.amount for tx in transactions if tx.amount < 0)
)
/ (months or 1),
}
It's full of async functions that handle specific cases in the analysis (not minding transaction validation and device detection logic). A particular function of interest is the classify_transactions
which uses yiyanghkust/finbert-tone
, A Large Language Model for Extracting Information from Financial Text, to perform Zero-Shot Classification of the transactions mostly into:
LABELS=groceries,school,housing,transportation,gadgets,entertainment,utilities,credit cards,miscellaneous,dining out,healthcare,insurance,savings,investments,childcare,travel,personal care,debts,charity,taxes,subscriptions,streaming services,home maintenance,shopping,pets,fitness,hobbies,gifts
Realistically, it'd be more accurate to finetune a barebone Bidirectional Encoder Representations from Transformers (BERT) such as RoBERTa
, DistilBERT
, and co with our data but my excuse was lack of adequate data and GPU (Google Colab isn't enough).
If you want to explore the idea behind finetuning and barebone Natural Language Processing with transformers, we can collaborate to create something like FinBERT
but for account statements and co. Please reach out.
There are other functions for detecting incomes/expenses that are "abnormal" by using IsolationForest and Z-score for the detection and for generating human-readable reasons for such a detection. Cool stuff!!! There were also functions for spending analysis and stuff but they are pretty basic.
amount
assumption
All the analysis here assumed that income
has a positive amount while expenses
possess negative amounts. If your data is different from this assumption, you may need to modify the code or data depending on your preference.
Before we roundup, let's see what utils/summarize.py
is
Step 5: Utility package - `summarize.py`
from datetime import datetime
import pandas as pd
from models.base import Transaction
from utils.analyzer import validate_transaction
from utils.settings import base_settings as settings
from utils.websocket import WebSocketManager
async def summarize_transactions(
transactions: list[dict], ws_manager: WebSocketManager = None
) -> dict:
"""Summarize transaction data."""
try:
# Validate and convert transactions to objects
if ws_manager:
await ws_manager.send_progress(
'Validating transactions...', 0.1, 'Summarize'
)
if not transactions:
if ws_manager:
await ws_manager.send_progress(
'No transactions provided', 1.0, 'Summarize'
)
return {'error': 'No transactions provided'}
tx_objects = [
Transaction(
_id=t['_id'],
balance=float(t['balance']),
type=t['type'],
date=datetime.fromisoformat(t['date']),
description=t['description'],
amount=float(t['amount']),
userId=t['userId'],
createdAt=datetime.fromisoformat(t['createdAt']),
updatedAt=datetime.fromisoformat(t['updatedAt']),
)
for t in transactions
if validate_transaction(t)
]
if not tx_objects:
settings.logger.warning('No valid transactions provided')
if ws_manager:
await ws_manager.send_progress(
'No valid transactions provided', 1.0, 'Summarize'
)
return {'error': 'No valid transactions provided'}
# Step 1: Calculate totals
if ws_manager:
await ws_manager.send_progress('Calculating totals...', 0.2, 'Summarize')
total_spent = sum(tx.amount for tx in tx_objects if tx.amount < 0)
total_income = sum(tx.amount for tx in tx_objects if tx.amount > 0)
# Step 2: Calculate additional metrics
if ws_manager:
await ws_manager.send_progress('Calculating average..', 0.35, 'Summarize')
total_savings = total_income + total_spent
total_transactions = len(tx_objects)
expense_count = sum(1 for tx in tx_objects if tx.amount < 0)
income_count = sum(1 for tx in tx_objects if tx.amount > 0)
avg_expense = abs(total_spent / expense_count) if expense_count > 0 else 0
avg_income = total_income / income_count if income_count > 0 else 0
# Step 3: Calculate largest transactions and date range
if ws_manager:
await ws_manager.send_progress(
'Identifying largest transactions...', 0.5, 'Summarize'
)
start_date = min(tx.date for tx in tx_objects)
end_date = max(tx.date for tx in tx_objects)
largest_expense = min(tx.amount for tx in tx_objects if tx.amount < 0)
largest_income = max(tx.amount for tx in tx_objects if tx.amount > 0)
# Step 4: Generate monthly summaries
if ws_manager:
await ws_manager.send_progress(
'Generating monthly summaries...', 0.7, 'Summarize'
)
df = pd.DataFrame([t.__dict__ for t in tx_objects])
df['date'] = pd.to_datetime(df['date'])
monthly_summary = (
df.groupby(df['date'].dt.to_period('M'))['amount'].sum().to_dict()
)
# Step 5: Analyze trends and changes
if ws_manager:
await ws_manager.send_progress('Analyzing trends...', 0.85, 'Summarize')
monthly_income = (
df[df['amount'] > 0].groupby(df['date'].dt.to_period('M'))['amount'].sum()
)
monthly_expense = (
df[df['amount'] < 0]
.groupby(df['date'].dt.to_period('M'))['amount']
.sum()
.abs()
)
monthly_savings = (monthly_income - monthly_expense).fillna(0)
income_trend = await calculate_trend(monthly_income)
expense_trend = await calculate_trend(monthly_expense)
savings_trend = await calculate_trend(monthly_savings)
income_change = await calculate_percentage_change(monthly_income)
expense_change = await calculate_percentage_change(monthly_expense)
savings_change = await calculate_percentage_change(monthly_savings)
savings_rate = (
((total_income + total_spent) / total_income) * 100 if total_income else 0
)
monthly_summary = {
str(month): {
'income': float(monthly_income.get(month, 0)),
'expenses': float(monthly_expense.get(month, 0)),
'savings': float(monthly_savings.get(month, 0)),
}
for month in monthly_income.index.union(monthly_expense.index)
}
# Compile summary results
summary = {
'income': {
'total': total_income,
'trend': income_trend,
'change': income_change,
},
'expenses': {
'total': abs(total_spent),
'trend': expense_trend,
'change': expense_change,
},
'savings': {
'total': total_savings,
'trend': savings_trend,
'change': savings_change,
},
'total_transactions': total_transactions,
'expense_count': expense_count,
'income_count': income_count,
'avg_expense': avg_expense,
'avg_income': avg_income,
'start_date': start_date.isoformat(),
'end_date': end_date.isoformat(),
'largest_expense': largest_expense,
'largest_income': largest_income,
'savings_rate': savings_rate,
'monthly_summary': monthly_summary,
}
settings.logger.info('Transaction summarization completed successfully')
return summary
except Exception as e:
settings.logger.error(f'Error summarizing transactions: {str(e)}')
if ws_manager:
await ws_manager.send_progress('Summarization failed', 1.0)
return {'error': f'Summarization failed: {str(e)}'}
async def calculate_trend(monthly_data: pd.Series) -> str:
"""
Calculate trend ('up', 'down', 'neutral') based on monthly data.
"""
if len(monthly_data) < 2:
return 'neutral'
recent_avg = monthly_data[-2:].mean()
earlier_avg = monthly_data[:-2].mean()
if recent_avg > earlier_avg:
return 'up'
elif recent_avg < earlier_avg:
return 'down'
else:
return 'neutral'
async def calculate_percentage_change(monthly_data: pd.Series) -> float:
"""
Calculate the percentage change from the highest monthly value to the average of the last two months.
"""
if len(monthly_data) < 2:
return 0
highest_value = monthly_data.max()
recent_avg = monthly_data[-2:].mean()
return (
((recent_avg - highest_value) / highest_value) * 100
if highest_value != 0
else 0
)
Though long (due to repeated codes that could have been drafted into reusable functions), the function just does basic summaries of your transactions. It's comprehensive but can be well extended.
In both analyses, I endeavored to update connections with progress reports by sending progress
actions at intervals informing the user of the time left.
Now, in app.py
, using init_app
, we created an Application
instance and added routes to it. We could have used @route
decorator instead. We didn't forget to do some housekeeping (clean-ups) of the background tasks previously started whenever we quit the app via different interrupts. That's it!
There were some submodules: utils/settings.py
, models/base.py
, etc, not discussed. They are basic setups for instrumentation (logging) and stuff. Others are for another system (maybe discussed in another series). In all, they are very basic.
Outro
Enjoyed this article? I'm a Software Engineer and Technical Writer actively seeking new opportunities, particularly in areas related to web security, finance, healthcare, and education. If you think my expertise aligns with your team's needs, let's chat! You can find me on LinkedIn and X. I am also an email away.
If you found this article valuable, consider sharing it with your network to help spread the knowledge!