Introduction

To kick off this intriguing and challenging journey, we'll start by building the AI service, the final component of our system’s architecture. We'll exploit the power of asynchronous programming in Python via the asyncio and one of its most lightweight yet powerful asynchronous HTTP Client/Servers, aiohttp. It is a "double-edged" sword that can be your http/websocket client and at the same time, http/websocket server. Unlike requests, which is synchronous, aiohttp supports both HTTP and WebSocket communication, making it a worthy alternative to FastAPI, Flask, or even Django for async-based applications.

Let's put aiohttp to work in this article! 🚀

Prerequisite

Before diving in, I assume you've already:

  • Created a project (e.g., utility)
  • Set up a virtual environment
  • Installed the required dependencies

If you haven't, copy the following into requirements.txt:

requirements.txt
txt
				aiohttp==3.11
pandas==2.2
pdf2image==1.17
pytesseract==0.3
transformers==4.48
python-dotenv==1.0
scikit-learn==1.6
torch @ https://download.pytorch.org/whl/cpu/torch-2.5.1%2Bcpu-cp312-cp312-linux_x86_64.whl
torchvision @ https://download.pytorch.org/whl/cpu/torchvision-0.20.1%2Bcpu-cp312-cp312-linux_x86_64.whl
			

and, while in your project's folder's virtual environment, run:

sh
				(virtualenv)$ pip install -r requirements.txt
			

Notice the versions of torch and torchvision used here. I used CPU-only versions of torch and torchvision since I deployed this on a CPU-based server. If you have a GPU-enabled server or development environment, you can install the standard versions without the URL specifier (@ https://download.pytorch.org/whl/...) which, by the way, is a nifty way to include libraries' URL in requirements.txt.

Note: You may prefer to develop using the latest tourch (v2.6).

Just as I write this, The PyTorch Foundation announced the release of PyTorch® 2.6 which debuts it for Python 3.13 with some performance improvements. Consider using it instead.

Source code

Sirneij
Sirneij/finance-analyzer
00

An AI-powered financial behavior analyzer and advisor written in Python (aiohttp) and TypeScript (ExpressJS & SvelteKit with Svelte 5)

sveltetypescriptpythonjavascriptcss3html5

Implementation

Let's now go into the meat of this article.

Step 1: Spawning an `aiohttp` Server

Create app.py and populate it with the following:

app.py
py
				import os
from asyncio import Lock

from aiohttp import WSMsgType, web
from aiohttp.web import Request, Response, WebSocketResponse

from utils.analyzer import analyze_transactions
from utils.extract_text import extract_text_from_pdf
from utils.settings import base_settings
from utils.summarize import summarize_transactions
from utils.websocket import WebSocketManager

# Replace global ws_connections with typed version
ws_connections: set[WebSocketResponse] = set()
ws_lock = Lock()


async def start_background_tasks(app):
    """Initialize application background tasks."""
    app['ws_connections'] = ws_connections
    app['ws_lock'] = ws_lock


async def cleanup_background_tasks(app):
    """Cleanup application resources."""
    await cleanup_ws(app)


async def cleanup_ws(app):
    """Cleanup WebSocket connections on shutdown."""
    async with ws_lock:
        connections = set(ws_connections)  # Create a copy to iterate safely
        for ws in connections:
            await ws.close(code=WSMsgType.CLOSE, message='Server shutdown')
        ws_connections.clear()

async def extract_text(request: Request) -> Response:
    try:
        base_settings.logger.info('Received text extraction request')
        reader = await request.multipart()
        field = await reader.next()

        if field.name != 'file':
            base_settings.logger.warning('No file field in request')
            return web.json_response({'error': 'No file uploaded'}, status=400)

        # Read the file content
        base_settings.logger.info('Reading uploaded file')
        file_content: bytes = await field.read()

        text: str = await extract_text_from_pdf(file_content)
        base_settings.logger.info('Successfully processed request')
        return web.json_response({'text': text})
    except Exception as e:
        base_settings.logger.error(
            f'Request processing failed: {str(e)}', exc_info=True
        )
        return web.json_response({'error': str(e)}, status=500)


async def websocket_handler(request: Request) -> WebSocketResponse:
    """WebSocket handler for real-time communication."""
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async with ws_lock:
        ws_connections.add(ws)
    ws_manager = WebSocketManager(ws)
    await ws_manager.prepare()

    base_settings.logger.info('WebSocket connection established')

    try:
        async for msg in ws:
            if msg.type == WSMsgType.TEXT:
                try:
                    data = msg.json()
                    if data.get('action') == 'analyze':
                        result = await analyze_transactions(
                            data.get('transactions'), ws_manager
                        )
                        await ws_manager.send_progress(
                            'Analysis complete', 1.0, 'Analysis'
                        )
                        await ws_manager.send_result(
                            result, 'Analysis', 'analysis_complete'
                        )
                    elif data.get('action') == 'summary':
                        result = await summarize_transactions(
                            data.get('transactions'), ws_manager
                        )
                        await ws_manager.send_progress(
                            'Summary complete', 1.0, 'Summarize'
                        )
                        await ws_manager.send_result(
                            result, 'Summarize', 'summary_complete'
                        )
                    else:
                        await ws_manager.send_result(
                            {'message': 'Unknown action'}, 'Error', 'error'
                        )
                except Exception as e:
                    base_settings.logger.error(f'Message processing error: {str(e)}')
                    await ws_manager.send_result({'error': str(e)}, 'Error', 'error')
            elif msg.type == WSMsgType.ERROR:
                base_settings.logger.error(f'WebSocket error: {ws.exception()}')
    finally:
        async with ws_lock:
            ws_connections.remove(ws)
        await ws.close()
        base_settings.logger.info('WebSocket connection closed')
    return ws


def init_app() -> web.Application:
    app = web.Application()
    app.router.add_post('/extract-text', extract_text)
    app.router.add_get('/ws', websocket_handler)

    # Add startup/cleanup handlers
    app.on_startup.append(start_background_tasks)
    app.on_cleanup.append(cleanup_background_tasks)

    return app


if __name__ == '__main__':
    app = init_app()
    try:
        web.run_app(
            app,
            host='0.0.0.0',
            port=int(os.environ.get('PORT', 5173)),
        )
    except KeyboardInterrupt:
        base_settings.logger.info('Received keyboard interrupt...')
    except Exception as e:
        base_settings.logger.error(f'Server error: {e}')
    finally:
        base_settings.logger.info('Server shutdown complete.')
			

It was a long line of code. However, looking closely, it's very straightforward. We started out registering all our WebSocket connections in a set (to prevent duplicates). We could have done something like as suggested:

app.py
py
				...
import weakref
...

ws_connections = web.AppKey('ws_connections', weakref.WeakSet)

...

			

but I went for the low-level approach and controlled the startup, and async locks to ensure thread safety, cleaning up, and stuff like that. You can use the suggested approach instead. Then comes the extract_text route handler. It's the handler that receives the user's transactions PDF file, forces that file is its name (this is my preference, you can allow any file name), and then let extract_text_from_pdf do its magic in extracting texts from the file and response with the extracted text. Let's see how extract_text_from_pdf can do this.

Step 2: Utility package - `extract_text.py`

utils/extract_text.py
py
				import os
import tempfile

import pytesseract
from pdf2image import convert_from_path

from .settings import base_settings as settings


async def extract_text_from_pdf(pdf_file: bytes) -> str:
    settings.logger.info('Starting PDF text extraction')
    # Create temporary file to store uploaded PDF
    with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_pdf:
        tmp_pdf.write(pdf_file)
        tmp_path: str = tmp_pdf.name
        settings.logger.debug(f'Created temporary file: {tmp_path}')

    try:
        # Convert to image using resolution 600 dpi
        settings.logger.info('Converting PDF to images')
        pages = convert_from_path(tmp_path)
        settings.logger.debug(f'Converted {len(pages)} pages')

        # Extract text
        text_data: str = ''
        for i, page in enumerate(pages, 1):
            settings.logger.debug(f'Processing page {i}')
            text: str = pytesseract.image_to_string(page)
            text_data += text + '\n'

        settings.logger.info('Text extraction completed successfully')
        return text_data
    except Exception as e:
        settings.logger.error(f'Error during text extraction: {str(e)}', exc_info=True)
        raise
    finally:
        # Clean up temporary file
        settings.logger.debug(f'Cleaning up temporary file: {tmp_path}')
        os.unlink(tmp_path)
			

It uses Google OCR via the pytesseract library to extract texts from the file. Although, OCR should be able to get texts directly from any document but it worked better for me with images via image_to_string hence the intermediate step of converting the PDF file to images before feeding it into OCR. The temporary file creation was because convert_from_path requires a path that could only be gotten from a temporary file at this point since we don't want to save anyone's file on our end. A better option is to use the convert_from_bytes from pdf2image which doesn't require temporary file creation.

Back to app.py, we also have a WebSocket handler that simply delegates actions to various submodules to handle. But before then, I have a utility that helps manage all the app's WebSocket connections. This is to elegantly manage connections without worrying about leaks and stuff like that.

Step 3: Utility package - `websocket.py`

utils/websocket.py
py
				from aiohttp import web

from utils.settings import base_settings


class WebSocketManager:
    def __init__(self, ws: web.WebSocketResponse):
        self.ws = ws
        self._ready = False
        base_settings.logger.info(f'Initializing WebSocket manager: {ws}')

    async def prepare(self):
        self._ready = True
        base_settings.logger.info('WebSocket manager ready')

    async def send_progress(self, message: str, progress: float, task_type: str = None):
        if not self._ready or self.ws.closed:
            base_settings.logger.warning(
                'Cannot send progress - WebSocket not ready/closed'
            )
            return

        try:
            await self.ws.send_json(
                {
                    'action': 'progress',
                    'message': message,
                    'progress': progress,
                    'taskType': task_type,
                }
            )
        except Exception as e:
            base_settings.logger.error(f'Error sending progress: {str(e)}')

    async def send_result(self, result: dict, task_type: str, action: str):
        if not self._ready or self.ws.closed:
            base_settings.logger.warning(
                'Cannot send result - WebSocket not ready/closed'
            )
            return

        try:
            await self.ws.send_json(
                {
                    'action': action,
                    'result': result,
                    'taskType': task_type,
                }
            )
        except Exception as e:
            base_settings.logger.error(f'Error sending result: {str(e)}')
			

The app would well work without it but I encountered a bug where connections were not ready but requests were already being made by the frontend. So I figured having this utility would help to ensure the readiness of these connections before accepting requests. I also implemented methods, send_result and send_progress, to utilize aiohttp's WebSocket's send_json to send analysis results and analysis progress reports respectively back to the requester(s) so that they won't be kept in the dark if the analysis is taking long (which will on a CPU-only machine using many transformer models). After the custom WebSocket manager completes preparation, we loop over the messages received and appropriately delegate actions based on the action key. For now, we support analyze and summary actions via the analyze_transactions and summarize_transactions respectively. Let's see what they do.

Step 4: Utility package - `analyzer.py`

utils/analyzer.py
py
				import asyncio
import os
from datetime import datetime

import numpy as np
import pandas as pd
import torch
from sklearn.ensemble import IsolationForest
from transformers import pipeline

from models.base import Transaction
from utils.settings import base_settings as settings
from utils.websocket import WebSocketManager


def get_device() -> tuple[torch.device, str]:
    """
    Detect the best available device (GPU, MPS, or CPU) for PyTorch computations.
    """
    if torch.cuda.is_available():
        # Check if CUDA (NVIDIA GPU) is available
        return torch.device('cuda'), 'CUDA (NVIDIA GPU)'
    elif torch.backends.mps.is_available():
        # Check if MPS (Metal Performance Shaders on Apple Silicon) is available
        return torch.device('mps'), 'MPS (Apple Metal)'
    else:
        # Default to CPU
        return torch.device('cpu'), 'CPU'


async def analyze_transactions(
    transactions: list[dict], ws_manager: WebSocketManager = None
) -> dict:
    """Analyze transactions and return insights with progress updates."""
    try:
        # Step 1: Validate and preprocess transactions
        if ws_manager:
            await ws_manager.send_progress(
                'Validating transactions...', 0.1, 'Analysis'
            )

        if not transactions:
            if ws_manager:
                await ws_manager.send_progress(
                    'No transactions provided', 1.0, 'Summarize'
                )
            return {'error': 'No transactions provided'}

        tx_objects = [
            Transaction(
                _id=t['_id'],
                balance=float(t['balance']),
                type=t['type'],
                date=datetime.fromisoformat(t['date']),
                description=t['description'],
                amount=float(t['amount']),
                userId=t['userId'],
                createdAt=datetime.fromisoformat(t['createdAt']),
                updatedAt=datetime.fromisoformat(t['updatedAt']),
            )
            for t in transactions
            if validate_transaction(t)
        ]

        if not tx_objects:
            if ws_manager:
                await ws_manager.send_progress(
                    'No valid transactions provided', 1.0, 'Analysis'
                )
            return {'error': 'No valid transactions provided'}

        # Step 2: Classification
        if ws_manager:
            await ws_manager.send_progress(
                'Classifying transactions...', 0.2, 'Analysis'
            )
        categories = await classify_transactions(tx_objects)

        # Step 3: Anomaly Detection
        if ws_manager:
            await ws_manager.send_progress('Detecting anomalies...', 0.4, 'Analysis')
        anomalies = await detect_anomalies(tx_objects)

        # Step 4: Spending Analysis
        if ws_manager:
            await ws_manager.send_progress('Analyzing spending...', 0.6, 'Analysis')
        spending_analysis = await analyze_spending(tx_objects)

        # Step 5: Trend Prediction
        if ws_manager:
            await ws_manager.send_progress(
                'Predicting spending trends...', 0.8, 'Analysis'
            )
        spending_trends = await predict_trends(tx_objects)

        # Compile the results
        result = {
            'categories': categories,
            'anomalies': anomalies,
            'spending_analysis': spending_analysis,
            'spending_trends': spending_trends,
        }

        settings.logger.info('Transaction analysis completed successfully')
        return result

    except Exception as e:
        settings.logger.error(f'Error analyzing transactions: {str(e)}', exc_info=True)
        if ws_manager:
            await ws_manager.send_progress('Analysis failed', 1.0)
        return {'error': f'Analysis failed: {str(e)}'}


def validate_transaction(t: dict) -> bool:
    """Validate transaction fields."""
    try:
        required_fields = {'date', 'description', 'amount', 'balance', 'type', 'userId'}
        if not all(field in t for field in required_fields):
            return False

        # Validate amount and balance are numeric
        float(t['amount'])
        float(t['balance'])

        # Validate date formats
        datetime.fromisoformat(t['date'])

        return True
    except (ValueError, TypeError):
        return False


async def classify_transactions(transactions: list[Transaction]) -> dict:
    """
    Classify transactions using FinBERT.
    """
    # Get device (GPU if available, otherwise CPU)
    device, device_name = get_device()
    settings.logger.info(f'Using device for classification: {device_name}')
    # Load FinBERT classification pipeline
    classifier = pipeline(
        'zero-shot-classification',
        model='yiyanghkust/finbert-tone',
        device=0 if device.type in ['cuda', 'mps'] else -1,  # Use GPU if available
    )

    # Define financial categories
    labels = os.getenv(
        'LABELS', 'groceries,housing,transportation,entertainment,utilities,other'
    ).split(',')

    # Prepare transaction descriptions for classification
    descriptions = [tx.description.lower() for tx in transactions]

    # Batch classify descriptions (Improves performance)
    results = await asyncio.to_thread(
        classifier, descriptions, labels, truncation=True, max_length=128
    )

    # Initialize categories and percentages
    categories = {label: 0 for label in labels}

    # Aggregate classification results
    for tx, result in zip(transactions, results):
        category = result['labels'][0]
        categories[category] += abs(tx.amount)

    total_spent = sum(categories.values())

    # Calculate percentages
    percentages = {
        category: (amount / total_spent) * 100 if total_spent > 0 else 0
        for category, amount in categories.items()
    }

    return {'categories': categories, 'percentages': percentages}


async def detect_anomalies(transactions: list[Transaction]) -> list[dict]:
    """Detect anomalies in transactions and provide specific reasons."""
    # Extract transaction amounts and reshape for Isolation Forest
    amounts = np.array([tx.amount for tx in transactions]).reshape(-1, 1)
    model = IsolationForest(contamination=0.05, random_state=42)
    anomalies = model.fit_predict(amounts)

    # Calculate mean and standard deviation for dynamic reason generation
    mean = np.mean(amounts)
    std = np.std(amounts)

    # Detect anomalies and generate specific reasons
    anomaly_details = []
    for tx, anomaly in zip(transactions, anomalies):
        if anomaly == -1:  # Anomaly detected
            # Calculate the Z-score for the transaction
            z_score = (tx.amount - mean) / std

            # Generate a dynamic reason
            if tx.amount > 0:
                reason = (
                    f'Unusually high income of {tx.amount} detected '
                    f'(Z-score: {z_score:.2f}).'
                )
            elif tx.amount < 0 and abs(tx.amount) > abs(mean) + 2 * std:
                reason = (
                    f'Unusually large expense of {tx.amount} detected '
                    f'(Z-score: {z_score:.2f}).'
                )
            elif tx.amount < 0 and 'luxury' in tx.description.lower():
                reason = 'Uncommon luxury expense detected.'
            elif tx.amount < 0 and 'groceries' in tx.description.lower():
                reason = 'Unusually high grocery expense detected.'
            else:
                reason = f'Outlier transaction with amount {tx.amount} (Z-score: {z_score:.2f}).'

            # Append the anomaly details
            anomaly_details.append(
                {
                    'date': tx.date.isoformat(),
                    'description': tx.description,
                    'amount': tx.amount,
                    'reason': reason,
                }
            )

    return anomaly_details


async def analyze_spending(transactions: list[Transaction]) -> dict:
    """Generate spending analysis with cumulative balance"""
    total_spent = sum(tx.amount for tx in transactions if tx.amount < 0)
    total_income = sum(tx.amount for tx in transactions if tx.amount > 0)

    # Create a DataFrame from transactions
    df = pd.DataFrame([t.__dict__ for t in transactions])

    # Ensure date parsing is correct
    df['date'] = pd.to_datetime(df['date'])
    df['date'] = df['date'].dt.tz_localize(None)

    # Group by the date and calculate daily totals
    daily_summary = df.groupby(df['date'].dt.date)['amount'].sum()

    # Sort by date to ensure cumulative calculations are correct
    df = df.sort_values(by='date')

    # Calculate the cumulative balance
    df['cumulative_balance'] = df['balance']

    # Convert daily_summary to JSON-serializable format
    daily_summary = {str(date): float(amount) for date, amount in daily_summary.items()}

    # Prepare cumulative balance as JSON-serializable format
    cumulative_balance = {
        row['date'].strftime('%Y-%m-%d'): row['cumulative_balance']
        for _, row in df.iterrows()
    }

    return {
        'total_spent': abs(total_spent),
        'total_income': total_income,
        'savings_rate': (
            ((total_income + total_spent) / total_income) * 100 if total_income else 0
        ),
        'daily_summary': daily_summary,
        'cumulative_balance': cumulative_balance,
    }


async def predict_trends(transactions: list[Transaction]) -> dict:
    """Predict future spending trends with enhanced analysis."""
    if len(transactions) < 2:
        return {'trend': 'Not enough data'}

    # Convert dates to numeric for regression
    dates = [(tx.date - transactions[0].date).days for tx in transactions]
    amounts = [tx.amount for tx in transactions]

    # Linear regression for trends
    coeffs = np.polyfit(dates, amounts, 1)
    trend = 'increasing' if coeffs[0] > 0 else 'decreasing'

    # Include confidence interval (optional)
    slope, intercept = coeffs
    trend_line = [slope * x + intercept for x in dates]

    # Estimated monthly spend
    df = pd.DataFrame([t.__dict__ for t in transactions])
    df['date'] = pd.to_datetime(df['date'])
    df['date'] = df['date'].dt.tz_localize(None)
    months = len(df['date'].dt.to_period('M').unique())

    return {
        'trend': trend,
        'trend_slope': slope,
        'estimated_monthly_spend': abs(
            sum(tx.amount for tx in transactions if tx.amount < 0)
        )
        / (months or 1),
    }
			

It's full of async functions that handle specific cases in the analysis (not minding transaction validation and device detection logic). A particular function of interest is the classify_transactions which uses yiyanghkust/finbert-tone, A Large Language Model for Extracting Information from Financial Text, to perform Zero-Shot Classification of the transactions mostly into:

.env
txt
				LABELS=groceries,school,housing,transportation,gadgets,entertainment,utilities,credit cards,miscellaneous,dining out,healthcare,insurance,savings,investments,childcare,travel,personal care,debts,charity,taxes,subscriptions,streaming services,home maintenance,shopping,pets,fitness,hobbies,gifts
			

Realistically, it'd be more accurate to finetune a barebone Bidirectional Encoder Representations from Transformers (BERT) such as RoBERTa, DistilBERT, and co with our data but my excuse was lack of adequate data and GPU (Google Colab isn't enough).

Tip: A nice way to collaborate

If you want to explore the idea behind finetuning and barebone Natural Language Processing with transformers, we can collaborate to create something like FinBERT but for account statements and co. Please reach out.

There are other functions for detecting incomes/expenses that are "abnormal" by using IsolationForest and Z-score for the detection and for generating human-readable reasons for such a detection. Cool stuff!!! There were also functions for spending analysis and stuff but they are pretty basic.

Note: amount assumption

All the analysis here assumed that income has a positive amount while expenses possess negative amounts. If your data is different from this assumption, you may need to modify the code or data depending on your preference.

Before we roundup, let's see what utils/summarize.py is

Step 5: Utility package - `summarize.py`

utils/summarize.py
py
				from datetime import datetime

import pandas as pd

from models.base import Transaction
from utils.analyzer import validate_transaction
from utils.settings import base_settings as settings
from utils.websocket import WebSocketManager


async def summarize_transactions(
    transactions: list[dict], ws_manager: WebSocketManager = None
) -> dict:
    """Summarize transaction data."""
    try:
        # Validate and convert transactions to objects
        if ws_manager:
            await ws_manager.send_progress(
                'Validating transactions...', 0.1, 'Summarize'
            )

        if not transactions:
            if ws_manager:
                await ws_manager.send_progress(
                    'No transactions provided', 1.0, 'Summarize'
                )
            return {'error': 'No transactions provided'}

        tx_objects = [
            Transaction(
                _id=t['_id'],
                balance=float(t['balance']),
                type=t['type'],
                date=datetime.fromisoformat(t['date']),
                description=t['description'],
                amount=float(t['amount']),
                userId=t['userId'],
                createdAt=datetime.fromisoformat(t['createdAt']),
                updatedAt=datetime.fromisoformat(t['updatedAt']),
            )
            for t in transactions
            if validate_transaction(t)
        ]

        if not tx_objects:
            settings.logger.warning('No valid transactions provided')
            if ws_manager:
                await ws_manager.send_progress(
                    'No valid transactions provided', 1.0, 'Summarize'
                )
            return {'error': 'No valid transactions provided'}

        # Step 1: Calculate totals
        if ws_manager:
            await ws_manager.send_progress('Calculating totals...', 0.2, 'Summarize')

        total_spent = sum(tx.amount for tx in tx_objects if tx.amount < 0)
        total_income = sum(tx.amount for tx in tx_objects if tx.amount > 0)

        # Step 2: Calculate additional metrics
        if ws_manager:
            await ws_manager.send_progress('Calculating average..', 0.35, 'Summarize')

        total_savings = total_income + total_spent
        total_transactions = len(tx_objects)
        expense_count = sum(1 for tx in tx_objects if tx.amount < 0)
        income_count = sum(1 for tx in tx_objects if tx.amount > 0)
        avg_expense = abs(total_spent / expense_count) if expense_count > 0 else 0
        avg_income = total_income / income_count if income_count > 0 else 0

        # Step 3: Calculate largest transactions and date range
        if ws_manager:
            await ws_manager.send_progress(
                'Identifying largest transactions...', 0.5, 'Summarize'
            )

        start_date = min(tx.date for tx in tx_objects)
        end_date = max(tx.date for tx in tx_objects)
        largest_expense = min(tx.amount for tx in tx_objects if tx.amount < 0)
        largest_income = max(tx.amount for tx in tx_objects if tx.amount > 0)

        # Step 4: Generate monthly summaries
        if ws_manager:
            await ws_manager.send_progress(
                'Generating monthly summaries...', 0.7, 'Summarize'
            )

        df = pd.DataFrame([t.__dict__ for t in tx_objects])
        df['date'] = pd.to_datetime(df['date'])
        monthly_summary = (
            df.groupby(df['date'].dt.to_period('M'))['amount'].sum().to_dict()
        )

        # Step 5: Analyze trends and changes
        if ws_manager:
            await ws_manager.send_progress('Analyzing trends...', 0.85, 'Summarize')

        monthly_income = (
            df[df['amount'] > 0].groupby(df['date'].dt.to_period('M'))['amount'].sum()
        )
        monthly_expense = (
            df[df['amount'] < 0]
            .groupby(df['date'].dt.to_period('M'))['amount']
            .sum()
            .abs()
        )
        monthly_savings = (monthly_income - monthly_expense).fillna(0)

        income_trend = await calculate_trend(monthly_income)
        expense_trend = await calculate_trend(monthly_expense)
        savings_trend = await calculate_trend(monthly_savings)

        income_change = await calculate_percentage_change(monthly_income)
        expense_change = await calculate_percentage_change(monthly_expense)
        savings_change = await calculate_percentage_change(monthly_savings)

        savings_rate = (
            ((total_income + total_spent) / total_income) * 100 if total_income else 0
        )

        monthly_summary = {
            str(month): {
                'income': float(monthly_income.get(month, 0)),
                'expenses': float(monthly_expense.get(month, 0)),
                'savings': float(monthly_savings.get(month, 0)),
            }
            for month in monthly_income.index.union(monthly_expense.index)
        }

        # Compile summary results
        summary = {
            'income': {
                'total': total_income,
                'trend': income_trend,
                'change': income_change,
            },
            'expenses': {
                'total': abs(total_spent),
                'trend': expense_trend,
                'change': expense_change,
            },
            'savings': {
                'total': total_savings,
                'trend': savings_trend,
                'change': savings_change,
            },
            'total_transactions': total_transactions,
            'expense_count': expense_count,
            'income_count': income_count,
            'avg_expense': avg_expense,
            'avg_income': avg_income,
            'start_date': start_date.isoformat(),
            'end_date': end_date.isoformat(),
            'largest_expense': largest_expense,
            'largest_income': largest_income,
            'savings_rate': savings_rate,
            'monthly_summary': monthly_summary,
        }
        settings.logger.info('Transaction summarization completed successfully')
        return summary
    except Exception as e:
        settings.logger.error(f'Error summarizing transactions: {str(e)}')
        if ws_manager:
            await ws_manager.send_progress('Summarization failed', 1.0)
        return {'error': f'Summarization failed: {str(e)}'}


async def calculate_trend(monthly_data: pd.Series) -> str:
    """
    Calculate trend ('up', 'down', 'neutral') based on monthly data.
    """
    if len(monthly_data) < 2:
        return 'neutral'

    recent_avg = monthly_data[-2:].mean()
    earlier_avg = monthly_data[:-2].mean()

    if recent_avg > earlier_avg:
        return 'up'
    elif recent_avg < earlier_avg:
        return 'down'
    else:
        return 'neutral'


async def calculate_percentage_change(monthly_data: pd.Series) -> float:
    """
    Calculate the percentage change from the highest monthly value to the average of the last two months.
    """
    if len(monthly_data) < 2:
        return 0

    highest_value = monthly_data.max()
    recent_avg = monthly_data[-2:].mean()

    return (
        ((recent_avg - highest_value) / highest_value) * 100
        if highest_value != 0
        else 0
    )
			

Though long (due to repeated codes that could have been drafted into reusable functions), the function just does basic summaries of your transactions. It's comprehensive but can be well extended.

In both analyses, I endeavored to update connections with progress reports by sending progress actions at intervals informing the user of the time left.

Now, in app.py, using init_app, we created an Application instance and added routes to it. We could have used @route decorator instead. We didn't forget to do some housekeeping (clean-ups) of the background tasks previously started whenever we quit the app via different interrupts. That's it!

Note: Undiscussed utilities

There were some submodules: utils/settings.py, models/base.py, etc, not discussed. They are basic setups for instrumentation (logging) and stuff. Others are for another system (maybe discussed in another series). In all, they are very basic.

Outro

Enjoyed this article? I'm a Software Engineer and Technical Writer actively seeking new opportunities, particularly in areas related to web security, finance, healthcare, and education. If you think my expertise aligns with your team's needs, let's chat! You can find me on LinkedIn and X. I am also an email away.

If you found this article valuable, consider sharing it with your network to help spread the knowledge!