Enterprise Data Hub für Leasing-Unternehmen Beispiel: Von isolierten Datensilos zur integrierten Datenplattform

📖 37 Min. Lesezeit
[Data Hub Data Architecture Leasing Power BI Data Science API Integration Data Warehouse Migration]

Enterprise Data Hub für Leasing-Unternehmen: Von isolierten Datensilos zur integrierten Datenplattform

Die Herausforderung: Datensilos in der Leasing-Branche

Leasing- und Finanzierungsunternehmen stehen vor einer besonderen Herausforderung: Ihre Geschäftsprozesse erzeugen Daten über zahlreiche, oft historisch gewachsene Systeme hinweg. Ein typisches mittelständisches Leasing-Unternehmen mit 500 Millionen Euro Leasingvolumen jongliert mit:

  • Kernbanksystem für Vertragsmanagement und Buchhaltung (oft Legacy-Systeme wie SAP Banking oder proprietäre Software)
  • CRM-System für Vertrieb und Kundenbeziehungen (Salesforce, Microsoft Dynamics)
  • Dokumentenmanagementsystem für Verträge, Policen, Korrespondenz
  • Risikomanagement-Tools für Bonitätsprüfung und Portfolio-Analysen
  • Payment-Processing-Systeme für SEPA-Lastschriften und Zahlungsabwicklung
  • Fleet-Management-Systeme (bei Fahrzeug-Leasing)
  • Asset-Management-Systeme für Anlagegüter-Tracking
  • Partner-Portale für Händler und Vertriebspartner

Jedes dieser Systeme produziert wertvolle Daten – aber in isolierten Silos, unterschiedlichen Formaten, mit inkonsistenten Definitionen und ohne zentrale Governance. Die Konsequenzen sind bekannt und schmerzhaft:

Business Impact:

  • Entscheidungen ohne vollständiges Bild: Risk-Manager sehen Bonitätsdaten, aber nicht die aktuelle Zahlungshistorie aus dem Payment-System
  • Reporting-Chaos: Monatliche Management-Reports erfordern manuelle Excel-Konsolidierung aus 8+ Systemen
  • Missed Opportunities: Cross-Selling-Potenziale bleiben unentdeckt, weil Vertriebs- und Vertragsdaten nicht verknüpft sind
  • Compliance-Risiken: IFRS 16 Reporting, BaFin-Meldungen und DSGVO-Anfragen dauern Wochen statt Stunden
  • Ineffizienz: Data Analysts verbringen 70% ihrer Zeit mit Datenaufbereitung statt mit Analysen

Technische Schulden:

  • Point-to-Point-Integrationen führen zu einem unmaintainbaren Integrations-Spaghetti
  • Duplikate und Inkonsistenzen: Ist "Müller GmbH" gleich "Mueller GmbH" gleich "Müller Holding"?
  • Keine Single Source of Truth für Stammdaten
  • Fehlende Historisierung: "Was war der Vertragsstand vor 6 Monaten?" ist unbeantwortbar

Die Lösung: Ein moderner Enterprise Data Hub, der als zentrale Datenplattform fungiert und alle relevanten Daten konsolidiert, standardisiert, anreichert und für verschiedene Konsumenten (BI-Tools, Data Science, APIs) bereitstellt.

Dieser Artikel dokumentiert den Aufbau eines solchen Data Hubs für ein fiktives, aber realistisches Leasing-Unternehmen "FinanceLease AG" – mit 800 Mitarbeitern, 2 Milliarden Euro Leasingvolumen, 45.000 aktiven Verträgen und Geschäftsbereichen in Fahrzeug-Leasing, Equipment-Leasing und Mietkauf. Die beschriebene Architektur, Technologien und Patterns sind direkt auf reale Projekte übertragbar.

Datenquellen: Das heterogene Quellsystem-Ökosystem

Übersicht der Datenquellen bei FinanceLease AG

Ein Data Hub ist nur so gut wie seine Fähigkeit, Daten aus verschiedensten Quellen zu integrieren. Bei FinanceLease AG haben wir folgende Quellsysteme identifiziert:

1. Core Banking System (SAP Banking / Finnova)

Typ: On-Premise SQL Server Database (Legacy) Datenvolumen: 500 GB, 200 Mio. Zeilen über alle Tabellen Kritische Daten:

  • Vertragsdetails (Vertrags-Nr., Laufzeit, Raten, Konditionen)
  • Kundenstammdaten (Firmendaten, Adressen, Kontaktpersonen)
  • Buchungsdaten (Zahlungseingänge, Forderungen, Mahnungen)
  • Asset-Informationen (Fahrzeuge, Maschinen, Equipment-Details)
  • Versicherungsdaten

Technische Charakteristika:

  • Keine native API, nur direkter DB-Zugriff möglich
  • Komplexes, denormalisiertes Schema (über 1.200 Tabellen)
  • Keine Change Data Capture (CDC) – nur Timestamps für Updates
  • Geschäftskritisch: 24/7 Betrieb, Lesezugriffe müssen Performance-neutral sein

Extraktionsstrategie:

-- Incremental Load Beispiel: Neue/geänderte Verträge seit letztem Load
SELECT
    VertragID,
    KundenID,
    Vertragsnummer,
    Leasingobjekt,
    Leasingrate,
    Laufzeit,
    Startdatum,
    Enddatum,
    Status,
    LastModifiedDate
FROM Vertraege
WHERE LastModifiedDate > @LastExtractTimestamp
    OR CreatedDate > @LastExtractTimestamp

Herausforderungen:

  • Legacy-Encoding (ISO-8859-1 statt UTF-8) → Umlaute-Probleme
  • Inkonsistente Löschlogik: Soft-Deletes nicht einheitlich implementiert
  • Historisierung fehlt: Vertrags-Änderungen überschreiben alte Werte
  • Performance: Full-Table-Scans bei fehlenden Indizes

2. CRM-System (Salesforce)

Typ: Cloud SaaS, REST API Datenvolumen: 120.000 Leads, 35.000 Opportunities, 50.000 Accounts Kritische Daten:

  • Sales Pipeline (Opportunities, Stages, Forecast)
  • Lead-Daten und Konversionstracking
  • Account-Hierarchien (Konzernstrukturen)
  • Aktivitäten (Calls, Meetings, E-Mails)
  • Custom Objects für Leasing-spezifische Workflows

Technische Charakteristika:

  • Salesforce REST API v58.0
  • Bulk API für große Datenmengen (> 10.000 Records)
  • API-Limits: 100.000 Calls/24h (wird bei großen Extracts schnell kritisch)
  • Webhook-Support für Echtzeit-Notifikationen

Extraktionsstrategie:

# Salesforce Bulk API Extraktion
from simple_salesforce import Salesforce, SalesforceBulk

sf = Salesforce(username=SF_USER, password=SF_PASS, security_token=SF_TOKEN)
bulk = SalesforceBulk(sessionId=sf.session_id, host=sf.sf_instance)

# Bulk Query für große Datenmengen
query = """
    SELECT Id, Name, Amount, StageName, CloseDate, LeaseType__c,
           ContractTerm__c, LastModifiedDate
    FROM Opportunity
    WHERE LastModifiedDate > LAST_N_DAYS:1
"""

job = bulk.create_query_job("Opportunity", contentType='JSON')
batch = bulk.query(job, query)
bulk.close_job(job)

# Ergebnisse abholen
results = bulk.get_all_results_for_query_batch(batch)
opportunities = [record for result in results for record in result]

Herausforderungen:

  • API-Rate-Limiting erfordert intelligente Batch-Steuerung
  • Custom Fields folgen Naming Convention: FieldName__c
  • Lookup-Relationships müssen aufgelöst werden (z.B. Account-Hierarchien)
  • Zeitzone-Handling: Salesforce UTC, Core Banking CET

3. Payment Processing System (SEPA Gateway)

Typ: REST API + SFTP für Batch-Files Datenvolumen: 1.5 Mio. Transaktionen/Jahr Kritische Daten:

  • SEPA-Lastschriften (mandats, status, rejection reasons)
  • Zahlungseingänge und -ausgänge
  • Bank-Connection-Status
  • Return-Debit-Informationen

Technische Charakteristika:

  • Real-time API für Transaktions-Status
  • Täglich SFTP-Upload: PAIN.008 XML (SEPA Lastschrift-Einreichung)
  • Täglich SFTP-Download: CAMT.053 XML (Kontoauszüge)
  • Webhook für Payment-Status-Changes

Extraktionsstrategie:

// CAMT.053 XML Parsing (ISO 20022 Standard)
public class CAMT053Parser
{
    public List<PaymentTransaction> ParseBankStatement(string xmlContent)
    {
        var doc = XDocument.Parse(xmlContent);
        XNamespace ns = "urn:iso:std:iso:20022:tech:xsd:camt.053.001.02";

        var transactions = doc.Descendants(ns + "Ntry")
            .Select(entry => new PaymentTransaction
            {
                BookingDate = DateTime.Parse(entry.Element(ns + "BookgDt")
                    ?.Element(ns + "Dt")?.Value),
                Amount = decimal.Parse(entry.Element(ns + "Amt")?.Value),
                Currency = entry.Element(ns + "Amt")?.Attribute("Ccy")?.Value,
                DebtorName = entry.Descendants(ns + "Dbtr")
                    .FirstOrDefault()?.Element(ns + "Nm")?.Value,
                DebtorIBAN = entry.Descendants(ns + "DbtrAcct")
                    .FirstOrDefault()?.Element(ns + "Id")
                    ?.Element(ns + "IBAN")?.Value,
                Reference = entry.Descendants(ns + "RmtInf")
                    .FirstOrDefault()?.Element(ns + "Ustrd")?.Value,
                Status = MapBookingStatus(entry.Element(ns + "Sts")?.Value)
            })
            .ToList();

        return transactions;
    }
}

Herausforderungen:

  • XML-Parsing von ISO 20022 Standards (komplex, verschachtelt)
  • Reconciliation: Matching von SEPA-Lastschriften zu Verträgen via Referenz
  • Late Rejections: Rückläufer können bis zu 8 Wochen nach Einzug kommen
  • Duplicate Detection: Wiederholte Downloads derselben Dateien

4. Fleet Management System (Eigensystem)

Typ: PostgreSQL Database + REST API Datenvolumen: 15.000 Fahrzeuge, 2 Mio. Telematik-Events/Tag Kritische Daten:

  • Fahrzeugstammdaten (Marke, Modell, VIN, Erstzulassung)
  • Standortdaten (GPS-Tracking)
  • Kilometer-Stände
  • Wartungs- und Reparaturhistorie
  • Schaden-Meldungen

Technische Charakteristika:

  • PostgreSQL 15 mit PostGIS Extension (Geo-Daten)
  • REST API für CRUD-Operationen
  • Event-Stream via Apache Kafka für Telematik-Daten (Echtzeit)
  • S3-Bucket für Fahrzeug-Bilder und Schadenfotos

Extraktionsstrategie:

# Kafka Consumer für Telematik-Stream
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'vehicle-telemetry',
    bootstrap_servers=['kafka-broker:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='data-hub-consumer'
)

for message in consumer:
    telemetry_event = message.value
    # {
    #   "vehicle_id": "VIN12345",
    #   "timestamp": "2025-01-15T10:30:00Z",
    #   "latitude": 52.520008,
    #   "longitude": 13.404954,
    #   "mileage": 45230,
    #   "fuel_level": 45.5,
    #   "battery_voltage": 12.6
    # }

    # Stream to Data Hub (z.B. Azure Event Hub oder Kafka Topic)
    send_to_data_hub(telemetry_event)

Herausforderungen:

  • Hohe Event-Frequenz: 2 Mio. Events/Tag = 23 Events/Sekunde durchschnittlich
  • Geo-Daten erfordern spezielle Handling (nicht standard SQL)
  • Image-Daten: Referenzen vs. Blob-Storage-Integration

5. Externe Datenquellen

Bonitätsdaten (SCHUFA API):

  • REST API, Rate-Limit: 1000 Calls/Stunde
  • On-Demand-Abfragen bei Neuverträgen
  • Historische Scores werden im Data Hub persistiert

Fahrzeugbewertungen (DAT/Schwacke API):

  • REST API für Restwert-Prognosen
  • Batch-Updates: Monatliche Neubewertung des gesamten Portfolios
  • CSV-Export-Option für Bulk-Daten

Makroökonomische Daten (Bundesbank API):

  • Zinssätze, Inflationsraten für Risk-Modelle
  • Open Data, keine Authentication
  • Täglich aktualisierte Zeitreihen

Datenquellen-Übersicht: Zusammenfassung

Quelle Typ Volumen Update-Frequenz Zugriffsmethode Kritikalität
Core Banking On-Prem DB 500 GB Real-time DB Query Critical
CRM (Salesforce) Cloud API 50K Accounts Real-time REST API High
Payment Processing Hybrid 1.5M tx/Jahr Täglich + RT SFTP + API Critical
Fleet Management On-Prem DB 15K Vehicles Real-time Stream Kafka + API High
SCHUFA External API On-Demand On-Demand REST API High
DAT/Schwacke External API Batch Monatlich REST API Medium
Bundesbank Open Data Zeitreihen Täglich REST API Low

Wichtige Erkenntnisse für die Data-Hub-Architektur:

  1. Heterogenität: Mix aus Legacy-Datenbanken, moderne Cloud-APIs, File-Drops und Event-Streams
  2. Latenz-Anforderungen: Von Echtzeit (Telematik) bis Batch (Bewertungen)
  3. Volumen-Varianz: Von wenigen KB (Makrodaten) bis GB-Range (Core Banking)
  4. Kritikalität: Payment und Core Banking sind geschäftskritisch → Fehlertoleranz essentiell
  5. Governance: Externe APIs haben Rate-Limits und Kosten → intelligente Caching-Strategie nötig

Zentrale Data Hub Komponenten: Die Architektur

Ein moderner Data Hub für ein Leasing-Unternehmen folgt einer mehrschichtigen Architektur, die verschiedene Datenverarbeitungs-Paradigmen unterstützt: Batch, Streaming, und On-Demand-Processing. Die Architektur orientiert sich an der Medallion-Architektur (Bronze-Silver-Gold), die sich in Data-Lake-Szenarien bewährt hat.

High-Level Architekturübersicht

┌─────────────────────────────────────────────────────────────────────────┐
│                          DATA SOURCES                                    │
│  Core Banking │ Salesforce │ Payment │ Fleet Mgmt │ External APIs       │
└─────────────┬───────────────────────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                     INGESTION LAYER                                      │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐                  │
│  │ Azure Data   │  │ Azure Event  │  │ Azure        │                  │
│  │ Factory      │  │ Hub          │  │ Functions    │                  │
│  │ (Batch/CDC)  │  │ (Streaming)  │  │ (API Pulls)  │                  │
│  └──────────────┘  └──────────────┘  └──────────────┘                  │
└─────────────┬───────────────────────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                    BRONZE LAYER (RAW DATA)                               │
│           Azure Data Lake Gen2 / Delta Lake                              │
│  • Raw data, as-is from sources                                         │
│  • Partitioned by source_system / ingestion_date                        │
│  • Immutable, append-only                                               │
└─────────────┬───────────────────────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                 PROCESSING LAYER (ETL/ELT)                               │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐                  │
│  │ Azure        │  │ Databricks   │  │ Azure        │                  │
│  │ Synapse      │  │ Spark        │  │ Stream       │                  │
│  │ Pipelines    │  │ (Complex)    │  │ Analytics    │                  │
│  └──────────────┘  └──────────────┘  └──────────────┘                  │
└─────────────┬───────────────────────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────────────────────┐
│              SILVER LAYER (CLEANED & VALIDATED)                          │
│           Azure Data Lake Gen2 / Delta Lake                              │
│  • Cleaned, validated, standardized                                     │
│  • Schema enforcement                                                   │
│  • De-duplication, data quality checks                                  │
└─────────────┬───────────────────────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────────────────────┐
│        GOLD LAYER (BUSINESS-LEVEL AGGREGATES)                            │
│      Azure Synapse Dedicated SQL Pool / Delta Lake                      │
│  • Business entities (Customer 360°, Contract View)                     │
│  • Pre-aggregated KPIs                                                  │
│  • Dimensional models (Star Schema)                                     │
└─────────────┬───────────────────────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                    CONSUMPTION LAYER                                     │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐                   │
│  │ Power BI│  │ Azure ML│  │ REST API│  │ External│                   │
│  │ Reports │  │ Models  │  │ (Apps)  │  │ Tools   │                   │
│  └─────────┘  └─────────┘  └─────────┘  └─────────┘                   │
└─────────────────────────────────────────────────────────────────────────┘

         Cross-Cutting Concerns:
         • Azure Purview (Data Governance, Catalog)
         • Azure Key Vault (Secrets Management)
         • Azure Monitor + App Insights (Monitoring)
         • Azure Active Directory (Authentication/Authorization)

Komponente 1: Ingestion Layer

Technologie-Stack:

  • Azure Data Factory für Batch-Ingestion (Datenbanken, APIs mit Bulk-Export)
  • Azure Event Hub für Streaming-Ingestion (Kafka-kompatibel, Telematik-Daten)
  • Azure Functions für On-Demand API-Calls (z.B. SCHUFA bei Vertragsabschluss)

Azure Data Factory Pipeline Beispiel:

{
  "name": "IngestCoreBankingContracts",
  "properties": {
    "activities": [
      {
        "name": "CopyContractsIncremental",
        "type": "Copy",
        "inputs": [
          {
            "referenceName": "SourceSQLServer",
            "type": "DatasetReference",
            "parameters": {
              "tableName": "Vertraege",
              "watermarkColumn": "LastModifiedDate",
              "watermarkValue": "@pipeline().parameters.LastWatermark"
            }
          }
        ],
        "outputs": [
          {
            "referenceName": "BronzeDataLake",
            "type": "DatasetReference",
            "parameters": {
              "folderPath": "bronze/core_banking/contracts/@{formatDateTime(utcnow(),'yyyy/MM/dd')}",
              "fileName": "contracts_@{formatDateTime(utcnow(),'yyyyMMdd_HHmmss')}.parquet"
            }
          }
        ],
        "typeProperties": {
          "source": {
            "type": "SqlServerSource",
            "sqlReaderQuery": {
              "value": "SELECT * FROM Vertraege WHERE LastModifiedDate > '@{pipeline().parameters.LastWatermark}'",
              "type": "Expression"
            }
          },
          "sink": {
            "type": "ParquetSink",
            "storeSettings": {
              "type": "AzureBlobFSWriteSettings"
            },
            "formatSettings": {
              "type": "ParquetWriteSettings",
              "compressionCodec": "snappy"
            }
          },
          "enableStaging": false,
          "translator": {
            "type": "TabularTranslator",
            "mappings": [
              {
                "source": { "name": "VertragID" },
                "sink": { "name": "contract_id", "type": "Int64" }
              },
              {
                "source": { "name": "Vertragsnummer" },
                "sink": { "name": "contract_number", "type": "String" }
              }
            ]
          }
        }
      },
      {
        "name": "UpdateWatermark",
        "type": "SqlServerStoredProcedure",
        "dependsOn": [
          {
            "activity": "CopyContractsIncremental",
            "dependencyConditions": [ "Succeeded" ]
          }
        ],
        "typeProperties": {
          "storedProcedureName": "usp_UpdateWatermark",
          "storedProcedureParameters": {
            "TableName": { "value": "Vertraege" },
            "WatermarkValue": { "value": "@{utcnow()}" }
          }
        }
      }
    ],
    "parameters": {
      "LastWatermark": {
        "type": "String",
        "defaultValue": "1900-01-01T00:00:00Z"
      }
    }
  }
}

Design Principles:

  • Idempotenz: Wiederholte Ausführung derselben Pipeline mit denselben Parametern produziert identische Ergebnisse
  • Watermarking: Incremental Load via Timestamps vermeidet Full-Table-Scans
  • Partitionierung: Daten nach Datum partitioniert für effiziente Queries und Retention-Management
  • Fehlertoleranz: Bei Fehler wird Wassermarke nicht aktualisiert → nächster Lauf holt fehlende Daten nach

Komponente 2: Bronze Layer (Raw Data)

Technologie: Azure Data Lake Storage Gen2 mit Delta Lake Format

Charakteristika:

  • Immutable: Daten werden nie verändert, nur hinzugefügt (Append-Only)
  • Schema-on-Read: Flexibles Schema, keine Enforcement
  • Vollständige Historie: Alle ingested Daten bleiben erhalten (subject to retention policy)
  • Partitionierung: Nach source_system, ingestion_date

Beispiel-Struktur:

bronze/
├── core_banking/
│   ├── contracts/
│   │   ├── year=2025/
│   │   │   ├── month=01/
│   │   │   │   ├── day=15/
│   │   │   │   │   ├── contracts_20250115_080000.parquet
│   │   │   │   │   └── contracts_20250115_200000.parquet
│   ├── customers/
│   ├── payments/
├── salesforce/
│   ├── opportunities/
│   ├── accounts/
│   ├── leads/
├── fleet_management/
│   ├── vehicles/
│   ├── telemetry/ (partitioned by hour for high-frequency data)

Delta Lake Beispiel:

from delta.tables import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("BronzeIngestion") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Read from source (z.B. Kafka)
raw_df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-broker:9092") \
    .option("subscribe", "vehicle-telemetry") \
    .load()

# Schema-Definition (optional, für Validierung)
from pyspark.sql.types import *

schema = StructType([
    StructField("vehicle_id", StringType(), False),
    StructField("timestamp", TimestampType(), False),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("mileage", IntegerType(), True),
    StructField("fuel_level", DoubleType(), True)
])

# Parse JSON
from pyspark.sql.functions import from_json, col, current_timestamp

telemetry_df = raw_df.select(
    from_json(col("value").cast("string"), schema).alias("data"),
    col("timestamp").alias("kafka_timestamp")
).select("data.*", "kafka_timestamp") \
 .withColumn("ingestion_timestamp", current_timestamp())

# Write to Bronze (Delta Lake)
telemetry_df.write \
    .format("delta") \
    .mode("append") \
    .partitionBy("year", "month", "day", "hour") \
    .save("abfss://bronze@datalake.dfs.core.windows.net/fleet_management/telemetry")

Komponente 3: Silver Layer (Cleaned & Validated)

Technologie: Delta Lake + Azure Databricks für komplexe Transformationen

Transformationen:

  1. Data Quality Checks

    • Null-Checks für Pflichtfelder
    • Range-Validierung (z.B. Leasingrate > 0)
    • Referential Integrity (z.B. existiert KundenID in Kundenstamm?)
  2. Standardisierung

    • Einheitliche Datumsformate (ISO 8601)
    • Währungskonvertierung (alle Beträge in EUR)
    • Adress-Normalisierung
  3. De-Duplication

    • Window-Functions für Duplikat-Erkennung
    • Merge-Logik für "Same Contract, Multiple Sources"
  4. Enrichment

    • Lookup zu Referenzdaten
    • Berechnete Felder (z.B. "Days Until Contract End")

Beispiel: Contract Silver Layer Processing

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.tables import DeltaTable

# Read Bronze
bronze_contracts = spark.read \
    .format("delta") \
    .load("abfss://bronze@datalake.dfs.core.windows.net/core_banking/contracts")

# Data Quality: Filter out invalid records
valid_contracts = bronze_contracts.filter(
    (F.col("contract_number").isNotNull()) &
    (F.col("contract_number") != "") &
    (F.col("lease_rate") > 0) &
    (F.col("start_date") <= F.col("end_date"))
)

# De-Duplication: Nehme neuesten Record pro contract_number
window_spec = Window.partitionBy("contract_number").orderBy(F.desc("LastModifiedDate"))
deduped_contracts = valid_contracts.withColumn("row_num", F.row_number().over(window_spec)) \
    .filter(F.col("row_num") == 1) \
    .drop("row_num")

# Standardization
standardized_contracts = deduped_contracts \
    .withColumn("start_date", F.to_date("start_date", "yyyy-MM-dd")) \
    .withColumn("end_date", F.to_date("end_date", "yyyy-MM-dd")) \
    .withColumn("lease_rate", F.round("lease_rate", 2)) \
    .withColumn("currency", F.lit("EUR"))

# Enrichment: Calculate derived fields
enriched_contracts = standardized_contracts \
    .withColumn("contract_duration_months",
                F.months_between("end_date", "start_date")) \
    .withColumn("days_until_end",
                F.datediff("end_date", F.current_date())) \
    .withColumn("contract_status",
                F.when(F.col("days_until_end") < 0, "EXPIRED")
                 .when(F.col("days_until_end") <= 90, "ENDING_SOON")
                 .otherwise("ACTIVE"))

# Write to Silver (with MERGE for upserts)
silver_path = "abfss://silver@datalake.dfs.core.windows.net/contracts"

if DeltaTable.isDeltaTable(spark, silver_path):
    silver_table = DeltaTable.forPath(spark, silver_path)

    silver_table.alias("target").merge(
        enriched_contracts.alias("source"),
        "target.contract_id = source.contract_id"
    ).whenMatchedUpdateAll() \
     .whenNotMatchedInsertAll() \
     .execute()
else:
    enriched_contracts.write \
        .format("delta") \
        .mode("overwrite") \
        .save(silver_path)

# Data Quality Metrics
dq_metrics = {
    "total_bronze_records": bronze_contracts.count(),
    "valid_records": valid_contracts.count(),
    "invalid_records": bronze_contracts.count() - valid_contracts.count(),
    "duplicates_removed": valid_contracts.count() - deduped_contracts.count(),
    "final_silver_records": enriched_contracts.count()
}

# Log metrics
print(f"Data Quality Metrics: {dq_metrics}")

Komponente 4: Gold Layer (Business-Level)

Technologie: Azure Synapse Dedicated SQL Pool + Delta Lake

Charakteristika:

  • Business Entities: Nicht technische Tabellen, sondern Business-Konzepte
  • Dimensional Modeling: Star Schema für optimale BI-Performance
  • Pre-Aggregated KPIs: Für schnelle Dashboard-Performance
  • Historisierung: Slowly Changing Dimensions (SCD Type 2)

Beispiel: Customer 360° View

-- Gold Layer: Customer 360° Dimensional Model

-- Dimension: Customer
CREATE TABLE gold.DimCustomer (
    CustomerKey INT IDENTITY(1,1) PRIMARY KEY,
    CustomerID VARCHAR(50) NOT NULL,
    CustomerNumber VARCHAR(50),
    CompanyName NVARCHAR(200),
    Industry NVARCHAR(100),
    EmployeeCount INT,
    AnnualRevenue DECIMAL(18,2),
    CreditRating VARCHAR(10),

    -- SCD Type 2 fields
    EffectiveFrom DATE NOT NULL,
    EffectiveTo DATE,
    IsCurrent BIT NOT NULL DEFAULT 1,

    -- Audit
    CreatedDate DATETIME2 DEFAULT GETUTCDATE(),
    ModifiedDate DATETIME2 DEFAULT GETUTCDATE()
);

-- Dimension: Contract
CREATE TABLE gold.DimContract (
    ContractKey INT IDENTITY(1,1) PRIMARY KEY,
    ContractID VARCHAR(50) NOT NULL,
    ContractNumber VARCHAR(50) NOT NULL,
    ContractType NVARCHAR(50), -- 'Leasing', 'Mietkauf'
    AssetType NVARCHAR(50), -- 'Vehicle', 'Equipment'
    AssetDescription NVARCHAR(500),
    LeaseRate DECIMAL(18,2),
    ContractDurationMonths INT,
    StartDate DATE,
    EndDate DATE,
    Status NVARCHAR(50), -- 'ACTIVE', 'ENDING_SOON', 'EXPIRED'

    -- Audit
    CreatedDate DATETIME2 DEFAULT GETUTCDATE(),
    ModifiedDate DATETIME2 DEFAULT GETUTCDATE()
);

-- Fact: Contract Transactions
CREATE TABLE gold.FactContractTransaction (
    TransactionKey BIGINT IDENTITY(1,1) PRIMARY KEY,
    DateKey INT NOT NULL, -- FK to DimDate
    CustomerKey INT NOT NULL, -- FK to DimCustomer
    ContractKey INT NOT NULL, -- FK to DimContract

    TransactionType NVARCHAR(50), -- 'PAYMENT', 'INVOICE', 'ADJUSTMENT'
    Amount DECIMAL(18,2),
    Currency CHAR(3),
    PaymentMethod NVARCHAR(50),
    PaymentStatus NVARCHAR(50),

    -- Metrics
    DaysOverdue INT,
    LateFeeAmount DECIMAL(18,2),

    TransactionDate DATE,
    CreatedDate DATETIME2 DEFAULT GETUTCDATE()
);

-- Aggregated KPI Table
CREATE TABLE gold.ContractKPIs (
    SnapshotDate DATE NOT NULL,
    ContractType NVARCHAR(50),

    TotalActiveContracts INT,
    TotalContractValue DECIMAL(18,2),
    AverageLeaseRate DECIMAL(18,2),

    PaymentsOnTime INT,
    PaymentsOverdue INT,
    PaymentSuccessRate DECIMAL(5,2),

    AverageContractDuration INT,
    ContractsEndingNext90Days INT,

    PRIMARY KEY (SnapshotDate, ContractType)
);

Materialized View für Customer 360°:

CREATE VIEW gold.vw_Customer360 AS
SELECT
    c.CustomerKey,
    c.CompanyName,
    c.Industry,
    c.CreditRating,

    -- Contract Metrics
    COUNT(DISTINCT ct.ContractKey) AS TotalContracts,
    SUM(CASE WHEN ct.Status = 'ACTIVE' THEN 1 ELSE 0 END) AS ActiveContracts,
    SUM(ct.LeaseRate) AS TotalMonthlyLeaseAmount,

    -- Payment Metrics
    SUM(CASE WHEN ft.PaymentStatus = 'PAID_ONTIME' THEN 1 ELSE 0 END) AS PaymentsOnTime,
    SUM(CASE WHEN ft.PaymentStatus = 'OVERDUE' THEN 1 ELSE 0 END) AS PaymentsOverdue,
    SUM(ft.LateFeeAmount) AS TotalLateFees,

    -- Salesforce Metrics
    COUNT(DISTINCT opp.OpportunityID) AS TotalOpportunities,
    SUM(opp.Amount) AS PipelineValue,

    -- Risk Indicators
    MAX(c.CreditRating) AS CurrentCreditRating,
    AVG(CAST(ft.DaysOverdue AS FLOAT)) AS AvgDaysOverdue,

    -- Last Activity
    MAX(ft.TransactionDate) AS LastPaymentDate,
    MAX(opp.LastActivityDate) AS LastSalesActivity

FROM gold.DimCustomer c
LEFT JOIN gold.DimContract ct ON c.CustomerID = ct.CustomerID
LEFT JOIN gold.FactContractTransaction ft ON ct.ContractKey = ft.ContractKey
LEFT JOIN gold.DimOpportunity opp ON c.CustomerID = opp.AccountID

WHERE c.IsCurrent = 1 -- Only current customer version (SCD Type 2)

GROUP BY
    c.CustomerKey,
    c.CompanyName,
    c.Industry,
    c.CreditRating;

Komponente 5: Metadata & Governance (Azure Purview)

Azure Purview fungiert als zentraler Data Catalog und Governance-Layer:

  • Data Discovery: Durchsuchbarer Katalog aller Datasets im Data Hub
  • Lineage Tracking: "Wo kommt dieser Wert her?" – vom Quellsystem bis zum Report
  • Data Classification: Automatische Erkennung von PII, Finanzdaten, etc.
  • Business Glossary: Zentrale Definitionen (z.B. "Was ist 'Leasingrate'?")

Beispiel: Lineage für "Total Contract Value" in Power BI Report

Power BI Report: "Management Dashboard"
    ↓ Measures: [Total Contract Value]
        ↓ Source: Gold.ContractKPIs.TotalContractValue
            ↓ Calculated in: gold_kpi_aggregation_pipeline
                ↓ Source: Silver.Contracts.LeaseRate
                    ↓ Transformation: silver_contract_enrichment
                        ↓ Source: Bronze.CoreBanking.Contracts.Leasingrate
                            ↓ Ingestion: adf_core_banking_contracts_pipeline
                                ↓ Original Source: CoreBankingDB.dbo.Vertraege.Leasingrate

Diese Transparenz ist kritisch für:

  • Compliance: DSGVO-Anfragen (Wo sind Daten der Person X?)
  • Impact Analysis: "Wenn ich Tabelle Y ändere, welche Reports brechen?"
  • Trust: Business-User sehen, wo Zahlen herkommen

Modern Reporting mit Power BI: Von Daten zu Insights

Power BI ist die primäre Konsumenten-Schicht für Business-User bei FinanceLease AG. Die Integration mit dem Azure-basierten Data Hub ermöglicht performantes, self-service-fähiges Reporting mit Echtzeit-Updates.

Architektur: Power BI ↔ Data Hub Integration

┌──────────────────────────────────────────────────────────────────┐
│                      POWER BI SERVICE                             │
│                                                                   │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐           │
│  │ Management   │  │ Risk &       │  │ Operations   │           │
│  │ Dashboard    │  │ Compliance   │  │ Dashboard    │           │
│  └──────────────┘  └──────────────┘  └──────────────┘           │
│                                                                   │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐           │
│  │ Sales        │  │ Finance      │  │ Fleet        │           │
│  │ Analytics    │  │ KPIs         │  │ Management   │           │
│  └──────────────┘  └──────────────┘  └──────────────┘           │
└─────────────┬────────────────────────────────────────────────────┘
              │
              ▼
┌──────────────────────────────────────────────────────────────────┐
│               DATA CONNECTION OPTIONS                             │
│                                                                   │
│  Option 1: DirectQuery (Echtzeit, limitierte Transformationen)   │
│  ┌─────────────────────────────────────────────────┐             │
│  │ Azure Synapse SQL Pool (Gold Layer)             │             │
│  │ • Sub-second query performance                  │             │
│  │ • Always up-to-date                             │             │
│  │ • Row-Level Security enforced in DB             │             │
│  └─────────────────────────────────────────────────┘             │
│                                                                   │
│  Option 2: Import (Fast, full DAX, scheduled refresh)            │
│  ┌─────────────────────────────────────────────────┐             │
│  │ Azure Synapse / Pre-aggregated Tables           │             │
│  │ • In-memory compression                         │             │
│  │ • Complex DAX calculations                      │             │
│  │ • Scheduled refresh (hourly/daily)              │             │
│  └─────────────────────────────────────────────────┘             │
│                                                                   │
│  Option 3: Composite (Best of both)                              │
│  ┌─────────────────────────────────────────────────┐             │
│  │ Import for Dimensions, DirectQuery for Facts    │             │
│  │ • Performance + Freshness                       │             │
│  └─────────────────────────────────────────────────┘             │
└──────────────────────────────────────────────────────────────────┘

Beispiel 1: Management Dashboard

Use Case: C-Level Executives benötigen täglich aktuelle KPIs zu Portfolio-Performance, Risk-Metriken, und Sales-Pipeline.

Power BI Dataset Configuration:

// Measures.dax - Zentrale Business-Metriken

// ==================== Portfolio Metrics ====================

[Total Active Contracts] :=
CALCULATE(
    COUNTROWS(DimContract),
    DimContract[Status] = "ACTIVE"
)

[Total Contract Value] :=
CALCULATE(
    SUM(DimContract[LeaseRate]) * [Average Contract Duration],
    DimContract[Status] IN {"ACTIVE", "ENDING_SOON"}
)

[Portfolio Growth YoY] :=
VAR CurrentYear = [Total Contract Value]
VAR PreviousYear =
    CALCULATE(
        [Total Contract Value],
        SAMEPERIODLASTYEAR(DimDate[Date])
    )
RETURN
    DIVIDE(CurrentYear - PreviousYear, PreviousYear, 0)

// ==================== Payment Performance ====================

[Payment Success Rate] :=
VAR OnTimePayments =
    CALCULATE(
        COUNTROWS(FactContractTransaction),
        FactContractTransaction[PaymentStatus] = "PAID_ONTIME"
    )
VAR TotalPayments = COUNTROWS(FactContractTransaction)
RETURN
    DIVIDE(OnTimePayments, TotalPayments, 0)

[Overdue Amount] :=
CALCULATE(
    SUM(FactContractTransaction[Amount]),
    FactContractTransaction[PaymentStatus] = "OVERDUE"
)

[Days Sales Outstanding (DSO)] :=
VAR AvgDailyRevenue =
    CALCULATE(
        [Total Contract Value] / 365,
        DimContract[Status] = "ACTIVE"
    )
VAR OutstandingReceivables =
    CALCULATE(
        SUM(FactContractTransaction[Amount]),
        FactContractTransaction[PaymentStatus] IN {"PENDING", "OVERDUE"}
    )
RETURN
    DIVIDE(OutstandingReceivables, AvgDailyRevenue, 0)

// ==================== Risk Indicators ====================

[High-Risk Contracts %] :=
VAR HighRiskContracts =
    CALCULATE(
        COUNTROWS(DimContract),
        DimContract[Status] = "ACTIVE",
        RELATED(DimCustomer[CreditRating]) IN {"D", "E"}
    )
VAR TotalActiveContracts = [Total Active Contracts]
RETURN
    DIVIDE(HighRiskContracts, TotalActiveContracts, 0)

[Expected Loss] :=
// Simplified Expected Loss Calculation
VAR ExposureAtDefault =
    SUMX(
        FILTER(DimContract, DimContract[Status] = "ACTIVE"),
        DimContract[LeaseRate] * DimContract[ContractDurationMonths]
    )
VAR ProbabilityOfDefault = [High-Risk Contracts %] * 0.15 // Simplified
VAR LossGivenDefault = 0.45 // Industry standard
RETURN
    ExposureAtDefault * ProbabilityOfDefault * LossGivenDefault

// ==================== Sales Pipeline ====================

[Total Pipeline Value] :=
CALCULATE(
    SUM(DimOpportunity[Amount]),
    DimOpportunity[Stage] <> "Closed Won",
    DimOpportunity[Stage] <> "Closed Lost"
)

[Win Rate] :=
VAR WonOpportunities =
    CALCULATE(
        COUNTROWS(DimOpportunity),
        DimOpportunity[Stage] = "Closed Won"
    )
VAR TotalClosedOpportunities =
    CALCULATE(
        COUNTROWS(DimOpportunity),
        DimOpportunity[Stage] IN {"Closed Won", "Closed Lost"}
    )
RETURN
    DIVIDE(WonOpportunities, TotalClosedOpportunities, 0)

[Average Deal Size] :=
CALCULATE(
    AVERAGE(DimOpportunity[Amount]),
    DimOpportunity[Stage] = "Closed Won"
)

// ==================== Trend Indicators ====================

[Contract Value Trend] :=
VAR CurrentValue = [Total Contract Value]
VAR PreviousMonthValue =
    CALCULATE(
        [Total Contract Value],
        DATEADD(DimDate[Date], -1, MONTH)
    )
RETURN
    IF(
        CurrentValue > PreviousMonthValue, "▲",
        IF(CurrentValue < PreviousMonthValue, "▼", "→")
    )

Dashboard Layout:

┌────────────────────────────────────────────────────────────────────┐
│  FINANCELEASE AG - MANAGEMENT DASHBOARD         🔄 Last: 08:30 UTC │
├────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  📊 KEY METRICS                                                     │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌───────────┐│
│  │ Total       │  │ Contract    │  │ Payment     │  │ Portfolio │││
│  │ Contracts   │  │ Value       │  │ Success     │  │ Growth    │││
│  │             │  │             │  │ Rate        │  │ YoY       │││
│  │  45,234  ▲  │  │ €2.1B    ▲  │  │  96.5%   ▼  │  │ +12.3%  ▲││
│  └─────────────┘  └─────────────┘  └─────────────┘  └───────────┘│
│                                                                     │
│  📈 PORTFOLIO COMPOSITION                                           │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │ [Stacked Bar Chart: Contract Value by Type & Month]         │  │
│  │ ▓▓▓▓▓▓▓ Vehicle Leasing (60%)                               │  │
│  │ ▓▓▓▓ Equipment Leasing (25%)                                │  │
│  │ ▓▓ Mietkauf (15%)                                           │  │
│  └──────────────────────────────────────────────────────────────┘  │
│                                                                     │
│  💰 PAYMENT PERFORMANCE                                             │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │ [Line Chart: Payment Success Rate Trend]                    │  │
│  │ Last 12 Months: Min 94.2%, Max 97.8%, Avg 96.1%            │  │
│  └──────────────────────────────────────────────────────────────┘  │
│                                                                     │
│  ⚠️ RISK INDICATORS                                                 │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                │
│  │ High-Risk   │  │ Expected    │  │ DSO         │                │
│  │ Contracts   │  │ Loss        │  │ Days        │                │
│  │  3.2%    ▲  │  │ €12.5M   ▲  │  │  38      →  │                │
│  └─────────────┘  └─────────────┘  └─────────────┘                │
│                                                                     │
│  🎯 SALES PIPELINE                                                  │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │ [Funnel Chart: Opportunities by Stage]                      │  │
│  │ Pipeline Value: €450M | Win Rate: 32% | Avg Deal: €850K    │  │
│  └──────────────────────────────────────────────────────────────┘  │
└────────────────────────────────────────────────────────────────────┘

Beispiel 2: Risk & Compliance Dashboard

Use Case: Risk-Manager benötigen granulare Einblicke in Portfolio-Risiken, Bonitätsentwicklung, und Compliance-Metriken.

Row-Level Security (RLS) Implementation:

// RLS Role: "Risk Manager - Region DACH"
// Nur Verträge aus Deutschland, Österreich, Schweiz sichtbar

[RLS_Region_DACH] =
    DimCustomer[Country] IN {"DE", "AT", "CH"}

DAX Measures:

// Vintage Analysis: Cohort-basierte Portfolio-Quality
[NPL Ratio by Vintage] :=
VAR VintageYear = SELECTEDVALUE(DimDate[Year])
RETURN
    CALCULATE(
        DIVIDE(
            COUNTROWS(
                FILTER(
                    DimContract,
                    DimContract[Status] = "DEFAULTED" &&
                    YEAR(DimContract[StartDate]) = VintageYear
                )
            ),
            COUNTROWS(
                FILTER(
                    DimContract,
                    YEAR(DimContract[StartDate]) = VintageYear
                )
            ),
            0
        )
    )

// Credit Migration Matrix
[Credit Rating Downgrade %] :=
VAR CustomersWithDowngrade =
    CALCULATE(
        DISTINCTCOUNT(DimCustomer[CustomerKey]),
        DimCustomer[CreditRatingChange] < 0,
        DimDate[Date] >= TODAY() - 365
    )
VAR TotalCustomers = DISTINCTCOUNT(DimCustomer[CustomerKey])
RETURN
    DIVIDE(CustomersWithDowngrade, TotalCustomers, 0)

// IFRS 16 Lease Liability
[Total Lease Liability] :=
SUMX(
    FILTER(DimContract, DimContract[Status] = "ACTIVE"),
    VAR RemainingMonths = DimContract[ContractDurationMonths] -
                           DATEDIFF(DimContract[StartDate], TODAY(), MONTH)
    VAR MonthlyRate = DimContract[LeaseRate]
    VAR DiscountRate = 0.05 / 12 // 5% annual discount rate
    RETURN
        MonthlyRate * ((1 - POWER(1 + DiscountRate, -RemainingMonths)) / DiscountRate)
)

Beispiel 3: Self-Service Analytics für Sales

Use Case: Sales-Manager möchten ad-hoc Analysen zu ihren Opportunities erstellen, ohne IT-Abhängigkeiten.

Power BI Dataflow für Sales-Daten:

{
  "name": "Sales_Analytics_Dataflow",
  "entities": [
    {
      "name": "OpportunitiesEnriched",
      "query": "
        let
          Source = AzureSynapse.Database('synapse-workspace', 'gold'),
          OpportunitiesTable = Source{[Schema='gold', Item='DimOpportunity']}[Data],
          AccountsTable = Source{[Schema='gold', Item='DimCustomer']}[Data],

          // Merge Opportunities with Customer Data
          MergedData = Table.NestedJoin(
            OpportunitiesTable, {'AccountID'},
            AccountsTable, {'CustomerID'},
            'CustomerData', JoinKind.LeftOuter
          ),

          // Expand Customer Fields
          ExpandedData = Table.ExpandTableColumn(
            MergedData, 'CustomerData',
            {'CompanyName', 'Industry', 'EmployeeCount', 'CreditRating'},
            {'Account_Name', 'Account_Industry', 'Account_Size', 'Credit_Rating'}
          ),

          // Add Custom Columns
          AddedColumns = Table.AddColumn(
            ExpandedData, 'Deal_Size_Category',
            each if [Amount] >= 1000000 then 'Large Deal (>1M)'
                 else if [Amount] >= 500000 then 'Medium Deal (500K-1M)'
                 else 'Small Deal (<500K)',
            type text
          ),

          // Calculate Days in Stage
          AddedDaysInStage = Table.AddColumn(
            AddedColumns, 'Days_In_Current_Stage',
            each Duration.Days(DateTime.LocalNow() - [LastStageChangeDate]),
            type number
          ),

          // Add Risk Flag
          AddedRiskFlag = Table.AddColumn(
            AddedDaysInStage, 'At_Risk_Flag',
            each if [Days_In_Current_Stage] > 90 and [Stage] <> 'Closed Won' and [Stage] <> 'Closed Lost'
                 then true else false,
            type logical
          )
        in
          AddedRiskFlag
      "
    }
  ]
}

Performance-Optimierung: Aggregation Tables

Für große Fact-Tabellen (FactContractTransaction hat 50 Mio. Zeilen) nutzen wir Automatic Aggregations:

-- Pre-aggregated table in Synapse
CREATE TABLE gold.FactContractTransaction_Agg_Daily
WITH (
    DISTRIBUTION = HASH(DateKey),
    CLUSTERED COLUMNSTORE INDEX
)
AS
SELECT
    DateKey,
    CustomerKey,
    ContractKey,
    TransactionType,
    PaymentStatus,

    -- Aggregated Metrics
    COUNT(*) AS TransactionCount,
    SUM(Amount) AS TotalAmount,
    AVG(Amount) AS AvgAmount,
    SUM(CASE WHEN DaysOverdue > 0 THEN 1 ELSE 0 END) AS OverdueCount,
    SUM(LateFeeAmount) AS TotalLateFees

FROM gold.FactContractTransaction
GROUP BY
    DateKey,
    CustomerKey,
    ContractKey,
    TransactionType,
    PaymentStatus;

Power BI erkennt automatisch diese Aggregation und nutzt sie für Queries auf Tag-Ebene (statt 50 Mio. Zeilen nur 500K).

Power BI Best Practices für Enterprise Data Hub

  1. DirectQuery für Echtzeit, Import für Performance

    • Management-Dashboards: DirectQuery (immer aktuell)
    • Historische Analysen: Import (schneller)
  2. Incremental Refresh für große Datasets

    // Nur letzte 2 Jahre im Speicher, Rest in DirectQuery
    Table.SelectRows(
        Source,
        each [TransactionDate] >= Date.AddYears(DateTime.LocalNow(), -2)
    )
    
  3. Row-Level Security konsistent über alle Layers

    • In Azure Synapse: SQL-basierte RLS
    • In Power BI: DAX-basierte RLS
    • Beide synchronisiert via Azure AD Groups
  4. Zentralisierte Semantic Models

    • Ein Dataset pro Business-Domain (Contracts, Sales, Risk)
    • Mehrere Reports nutzen dasselbe Dataset
    • "Single Version of Truth"
  5. Monitoring & Alerting

    • Power BI Premium: Query-Performance-Metrics
    • Alert bei Report-Refresh-Failures
    • Usage-Analytics: Welche Reports werden genutzt?

Data Science Lösungen auf dem Data Hub

Der Data Hub ist nicht nur für Reporting-Zwecke konzipiert, sondern auch als Fundament für Advanced Analytics und Machine Learning. Bei FinanceLease AG nutzen wir Azure Machine Learning und Databricks für verschiedene Data-Science-Use-Cases.

Use Case 1: Churn Prediction für Vertragsverlängerungen

Business-Problem: 30% der Leasingverträge werden nach Ablauf nicht verlängert. Können wir Kunden mit hohem Churn-Risk frühzeitig identifizieren und proaktiv ansprechen?

Data Science Ansatz:

# Azure ML Pipeline für Churn Prediction Model

from azureml.core import Workspace, Dataset, Experiment
from azureml.train.automl import AutoMLConfig
import pandas as pd

# Connect to Azure ML Workspace
ws = Workspace.from_config()

# Feature Engineering: Daten aus Gold Layer
query = """
SELECT
    c.ContractKey,
    c.ContractNumber,
    c.ContractType,
    c.LeaseRate,
    c.ContractDurationMonths,
    DATEDIFF(day, GETDATE(), c.EndDate) AS DaysUntilEnd,

    -- Customer Features
    cust.Industry,
    cust.EmployeeCount,
    cust.CreditRating,

    -- Payment Behavior Features
    SUM(CASE WHEN ft.PaymentStatus = 'PAID_ONTIME' THEN 1 ELSE 0 END) AS OnTimePaymentCount,
    SUM(CASE WHEN ft.PaymentStatus = 'OVERDUE' THEN 1 ELSE 0 END) AS OverduePaymentCount,
    AVG(ft.DaysOverdue) AS AvgDaysOverdue,
    SUM(ft.LateFeeAmount) AS TotalLateFees,

    -- Engagement Features
    COUNT(DISTINCT sr.RequestID) AS ServiceRequestCount,
    AVG(sr.ResolutionTimeHours) AS AvgResolutionTime,

    -- Sales Activity Features
    COUNT(DISTINCT opp.OpportunityID) AS NewOpportunitiesCount,
    MAX(opp.LastActivityDate) AS LastSalesContact,

    -- Target Variable
    CASE
        WHEN renewal.ContractID IS NOT NULL THEN 1  -- Renewed
        ELSE 0  -- Churned
    END AS Renewed

FROM gold.DimContract c
LEFT JOIN gold.DimCustomer cust ON c.CustomerID = cust.CustomerID
LEFT JOIN gold.FactContractTransaction ft ON c.ContractKey = ft.ContractKey
LEFT JOIN gold.FactServiceRequest sr ON c.ContractKey = sr.ContractKey
LEFT JOIN gold.DimOpportunity opp ON c.CustomerID = opp.AccountID
LEFT JOIN gold.DimContract renewal ON c.ContractNumber = renewal.PreviousContractNumber

WHERE
    c.EndDate < GETDATE()  -- Only expired contracts
    AND c.EndDate >= DATEADD(year, -2, GETDATE())  -- Last 2 years

GROUP BY
    c.ContractKey, c.ContractNumber, c.ContractType, c.LeaseRate,
    c.ContractDurationMonths, c.EndDate, cust.Industry, cust.EmployeeCount,
    cust.CreditRating, renewal.ContractID
"""

# Load data from Synapse
dataset = Dataset.Tabular.from_sql_query(
    query,
    query_timeout=600,
    validate=True,
    workspace=ws
)

df = dataset.to_pandas_dataframe()

# Train/Test Split
from sklearn.model_selection import train_test_split

X = df.drop(['ContractKey', 'ContractNumber', 'Renewed'], axis=1)
y = df['Renewed']

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# AutoML Configuration
automl_config = AutoMLConfig(
    task='classification',
    primary_metric='AUC_weighted',
    training_data=X_train,
    label_column_name='Renewed',
    n_cross_validations=5,
    enable_early_stopping=True,
    experiment_timeout_hours=2,
    max_concurrent_iterations=4,
    featurization='auto'
)

# Run Experiment
experiment = Experiment(ws, 'churn-prediction')
run = experiment.submit(automl_config, show_output=True)

# Get Best Model
best_run, fitted_model = run.get_output()

print(f"Best Model: {best_run.properties['model_name']}")
print(f"AUC: {best_run.get_metrics()['AUC_weighted']:.4f}")

# Register Model
from azureml.core.model import Model

model = Model.register(
    workspace=ws,
    model_path='outputs/model.pkl',
    model_name='churn_prediction_model',
    tags={'type': 'classification', 'framework': 'scikit-learn'},
    description='Predicts contract renewal likelihood'
)

Feature Importance:

Top Features für Churn Prediction:
1. AvgDaysOverdue (0.28) - Zahlungsverzug ist stärkster Indikator
2. ServiceRequestCount (0.18) - Viele Support-Anfragen = Unzufriedenheit
3. LastSalesContact (0.15) - Kürzlicher Kontakt erhöht Retention
4. TotalLateFees (0.12) - Finanzielle Friction
5. EmployeeCount (0.09) - Unternehmensgröße korreliert mit Renewal-Rate

Deployment & Scoring:

# Real-time Scoring Endpoint
from azureml.core.webservice import AciWebservice, Webservice
from azureml.core.model import InferenceConfig

inference_config = InferenceConfig(
    entry_script='score.py',
    environment=env
)

aci_config = AciWebservice.deploy_configuration(
    cpu_cores=1,
    memory_gb=2,
    auth_enabled=True
)

service = Model.deploy(
    workspace=ws,
    name='churn-prediction-service',
    models=[model],
    inference_config=inference_config,
    deployment_config=aci_config
)

service.wait_for_deployment(show_output=True)
print(f"Scoring URI: {service.scoring_uri}")

# Batch Scoring Pipeline (täglich für alle auslaufenden Verträge)
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep

# Score contracts ending in next 90 days
batch_score_step = PythonScriptStep(
    name='batch_score_churn',
    script_name='batch_score.py',
    arguments=[
        '--model-name', 'churn_prediction_model',
        '--output-table', 'gold.ChurnPredictions'
    ],
    compute_target='ml-compute-cluster',
    allow_reuse=False
)

pipeline = Pipeline(workspace=ws, steps=[batch_score_step])
pipeline_run = pipeline.submit(experiment_name='churn-batch-scoring')

Business Impact:

  • Frühzeitiges Eingreifen bei High-Risk-Kunden (Churn-Score > 0.7)
  • Sales erhält täglich priorisierte Liste für proaktive Kontakte
  • Retention-Rate stieg von 70% auf 82% (12 Prozentpunkte)
  • ROI: €4.5M zusätzliche Vertragsvolumen/Jahr

Use Case 2: Dynamische Restwert-Prognosen für Leasing-Assets

Business-Problem: Restwert-Prognosen von externen Anbietern (DAT/Schwacke) sind generisch. Können wir bessere Prognosen durch eigene Daten trainieren?

Ansatz: Gradient Boosting Regression

# Feature Engineering für Fahrzeug-Restwert-Prognose

import databricks.koalas as ks
from pyspark.sql import functions as F

# Read from Silver Layer
vehicles_df = spark.read.format("delta").load("abfss://silver@datalake/fleet_management/vehicles")
telemetry_df = spark.read.format("delta").load("abfss://silver@datalake/fleet_management/telemetry")

# Aggregate Telemetry Features
telemetry_agg = telemetry_df.groupBy("vehicle_id").agg(
    F.max("mileage").alias("current_mileage"),
    F.avg("mileage").alias("avg_daily_mileage"),
    F.count("*").alias("telemetry_events_count"),
    F.stddev("fuel_level").alias("fuel_consumption_variance")
)

# Join and create features
features_df = vehicles_df.join(telemetry_agg, "vehicle_id", "left") \
    .withColumn("vehicle_age_months", F.months_between(F.current_date(), "first_registration_date")) \
    .withColumn("mileage_per_month", F.col("current_mileage") / F.col("vehicle_age_months")) \
    .withColumn("brand_segment", F.when(F.col("brand").isin(["Mercedes", "BMW", "Audi"]), "Premium")
                                   .otherwise("Standard"))

# Target: Actual Sale Price from historical data
target_df = spark.read.format("delta").load("abfss://silver@datalake/asset_sales")

# Train XGBoost Model
from xgboost import XGBRegressor
from sklearn.metrics import mean_absolute_error, r2_score

model = XGBRegressor(
    n_estimators=500,
    learning_rate=0.05,
    max_depth=7,
    subsample=0.8,
    colsample_bytree=0.8,
    random_state=42
)

model.fit(X_train, y_train)

# Predictions
y_pred = model.predict(X_test)

print(f"MAE: €{mean_absolute_error(y_test, y_pred):.2f}")
print(f"R²: {r2_score(y_test, y_pred):.4f}")

# Results:
# MAE: €850 (DAT/Schwacke: €1,450) - 41% bessere Genauigkeit
# R²: 0.91

Use Case 3: Anomalie-Erkennung bei Zahlungseingängen

Use Case: Frühzeitige Erkennung von abnormalen Zahlungsmustern zur Fraud-Detection.

# Isolation Forest für Anomalie-Erkennung

from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler

# Features: Payment Behavior Patterns
features = [
    'payment_amount',
    'days_since_last_payment',
    'payment_hour',  # Ungewöhnliche Zahlungszeiten
    'amount_deviation_from_avg',
    'payment_frequency_last_30_days'
]

scaler = StandardScaler()
X_scaled = scaler.fit_transform(df[features])

# Train Isolation Forest
iso_forest = IsolationForest(
    contamination=0.05,  # Erwarte 5% Anomalien
    random_state=42
)

df['anomaly_score'] = iso_forest.fit_predict(X_scaled)
df['anomaly_probability'] = iso_forest.score_samples(X_scaled)

# Anomalien zur manuellen Review
anomalies = df[df['anomaly_score'] == -1].sort_values('anomaly_probability')

# Write back to Data Hub
anomalies.to_sql(
    'FactPaymentAnomalies',
    con=synapse_connection,
    schema='gold',
    if_exists='append',
    index=False
)

API-Integration für Dritt-Applikationen

Der Data Hub muss nicht nur Daten konsumieren, sondern auch für externe Systeme zugänglich machen. Bei FinanceLease AG nutzen wir Azure API Management als Gateway.

Architektur: API Layer

┌─────────────────────────────────────────────────────────────┐
│                 External Consumers                           │
│  • Mobile App  • Partner Portal  • Third-Party Tools        │
└───────────────┬─────────────────────────────────────────────┘
                │
                ▼
┌─────────────────────────────────────────────────────────────┐
│           Azure API Management (APIM)                        │
│  • Authentication (OAuth 2.0, API Keys)                     │
│  • Rate Limiting (1000 req/hour per consumer)               │
│  • Caching (Redis)                                          │
│  • Logging & Analytics                                      │
└───────────────┬─────────────────────────────────────────────┘
                │
                ▼
┌─────────────────────────────────────────────────────────────┐
│        Azure Functions (REST API Endpoints)                  │
│  • GET /api/contracts/{id}                                  │
│  • GET /api/customers/{id}/contracts                        │
│  • POST /api/contracts/{id}/payments                        │
│  • GET /api/analytics/kpis                                  │
└───────────────┬─────────────────────────────────────────────┘
                │
                ▼
┌─────────────────────────────────────────────────────────────┐
│        Azure Synapse (Gold Layer) / Cosmos DB                │
│  • Read-Optimized Views                                     │
│  • Row-Level Security enforced                              │
└─────────────────────────────────────────────────────────────┘

Beispiel-API: Contract Details

// Azure Function: Get Contract Details
[FunctionName("GetContractDetails")]
public async Task<IActionResult> GetContractDetails(
    [HttpTrigger(AuthorizationLevel.Function, "get",
                 Route = "contracts/{contractNumber}")] HttpRequest req,
    string contractNumber,
    ILogger log)
{
    // Validate authentication
    if (!ValidateApiKey(req.Headers["X-API-Key"]))
    {
        return new UnauthorizedResult();
    }

    // Row-Level Security: Check user permissions
    var userContext = GetUserContext(req.Headers["Authorization"]);
    if (!await HasAccessToContract(userContext, contractNumber))
    {
        return new ForbidResult();
    }

    // Query Synapse
    var query = @"
        SELECT
            c.ContractNumber,
            c.ContractType,
            c.LeaseRate,
            c.StartDate,
            c.EndDate,
            c.Status,
            cust.CompanyName,
            cust.Industry,
            v.VIN,
            v.Brand,
            v.Model,
            ISNULL(payments.TotalPaid, 0) AS TotalPaid,
            ISNULL(payments.OutstandingAmount, 0) AS OutstandingAmount
        FROM gold.DimContract c
        INNER JOIN gold.DimCustomer cust ON c.CustomerID = cust.CustomerID
        LEFT JOIN gold.DimVehicle v ON c.AssetID = v.VehicleID
        LEFT JOIN (
            SELECT
                ContractKey,
                SUM(CASE WHEN PaymentStatus = 'PAID_ONTIME' THEN Amount ELSE 0 END) AS TotalPaid,
                SUM(CASE WHEN PaymentStatus = 'PENDING' THEN Amount ELSE 0 END) AS OutstandingAmount
            FROM gold.FactContractTransaction
            GROUP BY ContractKey
        ) payments ON c.ContractKey = payments.ContractKey
        WHERE c.ContractNumber = @ContractNumber
    ";

    using var connection = new SqlConnection(_synapseConnectionString);
    var contract = await connection.QueryFirstOrDefaultAsync<ContractDetailDto>(
        query,
        new { ContractNumber = contractNumber }
    );

    if (contract == null)
    {
        return new NotFoundResult();
    }

    // Add enrichments from ML models
    contract.ChurnProbability = await GetChurnPrediction(contractNumber);
    contract.RecommendedActions = await GetRecommendations(contract);

    return new OkObjectResult(contract);
}

OpenAPI Specification:

openapi: 3.0.0
info:
  title: FinanceLease Data Hub API
  version: 1.0.0
  description: Access to contract, customer, and analytics data

security:
  - ApiKeyAuth: []
  - OAuth2: [read:contracts, write:payments]

paths:
  /api/contracts/{contractNumber}:
    get:
      summary: Get contract details
      parameters:
        - name: contractNumber
          in: path
          required: true
          schema:
            type: string
      responses:
        '200':
          description: Contract found
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/Contract'
        '404':
          description: Contract not found
        '401':
          description: Unauthorized

  /api/analytics/kpis:
    get:
      summary: Get real-time KPIs
      parameters:
        - name: date
          in: query
          schema:
            type: string
            format: date
      responses:
        '200':
          description: KPIs retrieved
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/KPIs'

components:
  securitySchemes:
    ApiKeyAuth:
      type: apiKey
      in: header
      name: X-API-Key

  schemas:
    Contract:
      type: object
      properties:
        contractNumber:
          type: string
        contractType:
          type: string
          enum: [Leasing, Mietkauf]
        leaseRate:
          type: number
          format: decimal
        startDate:
          type: string
          format: date
        endDate:
          type: string
          format: date
        status:
          type: string
          enum: [ACTIVE, ENDING_SOON, EXPIRED]
        totalPaid:
          type: number
        outstandingAmount:
          type: number
        churnProbability:
          type: number
          format: float

Rate Limiting & Caching

<!-- APIM Policy für Rate Limiting und Caching -->
<policies>
    <inbound>
        <!-- Authentication -->
        <validate-jwt header-name="Authorization" require-scheme="Bearer">
            <issuer-signing-keys>
                <key>{{jwt-signing-key}}</key>
            </issuer-signing-keys>
            <audiences>
                <audience>api://financelease-datahub</audience>
            </audiences>
        </validate-jwt>

        <!-- Rate Limiting: 1000 requests/hour per subscription -->
        <rate-limit-by-key calls="1000" renewal-period="3600"
                           counter-key="@(context.Subscription.Id)" />

        <!-- Caching für häufig abgerufene Endpoints -->
        <cache-lookup vary-by-developer="true" vary-by-developer-groups="false">
            <vary-by-query-parameter>date</vary-by-query-parameter>
        </cache-lookup>

        <!-- Request Transformation -->
        <set-backend-service base-url="https://datahub-api.azurewebsites.net" />
    </inbound>

    <backend>
        <forward-request timeout="30" />
    </backend>

    <outbound>
        <!-- Cache successful responses für 5 Minuten -->
        <cache-store duration="300" />

        <!-- Add Headers -->
        <set-header name="X-Data-Freshness" exists-action="override">
            <value>@(DateTime.UtcNow.ToString("o"))</value>
        </set-header>
    </outbound>

    <on-error>
        <!-- Error Handling -->
        <set-body>@{
            return new JObject(
                new JProperty("error", context.LastError.Message),
                new JProperty("timestamp", DateTime.UtcNow)
            ).ToString();
        }</set-body>
    </on-error>
</policies>

Zugriffssicherheit: Multi-Layer Security Model

Zugriffssicherheit ist bei Financial-Daten nicht optional. FinanceLease AG implementiert ein Defense-in-Depth-Modell mit mehreren Sicherheitsschichten.

Layer 1: Network Security

┌─────────────────────────────────────────────────────────────┐
│                     Azure Virtual Network                    │
│  ┌────────────────┐  ┌────────────────┐  ┌───────────────┐ │
│  │ Subnet: Data   │  │ Subnet: Compute│  │ Subnet: API   │ │
│  │ (Synapse, SQL) │  │ (Databricks)   │  │ (APIM, Funcs) │ │
│  └────────────────┘  └────────────────┘  └───────────────┘ │
│                                                               │
│  • Private Endpoints für alle Services                       │
│  • Network Security Groups (NSGs) mit Deny-by-Default        │
│  • Azure Firewall für Outbound-Traffic                       │
└─────────────────────────────────────────────────────────────┘

Public Internet
     ↓ (nur via Azure Front Door + WAF)
   [API Management]
     ↓
   Private Network (keine direkte Internet-Exposition)

Layer 2: Identity & Access Management (Azure AD)

// Role-Based Access Control (RBAC)

// Definierte Rollen:
public enum DataHubRole
{
    DataEngineer,        // Voller Zugriff auf alle Layers
    DataAnalyst,         // Read auf Silver/Gold
    BusinessUser,        // Read auf Gold (via Power BI/API)
    DataSci entist,       // Read Silver/Gold + ML Workspace
    AuditUser            // Read-Only für Compliance
}

// Azure AD Group-Mappings
var roleGroups = new Dictionary<DataHubRole, string>
{
    { DataHubRole.DataEngineer, "SG-DataHub-Engineers" },
    { DataHubRole.DataAnalyst, "SG-DataHub-Analysts" },
    { DataHubRole.BusinessUser, "SG-DataHub-BusinessUsers" },
    { DataHubRole.DataScientist, "SG-DataHub-DataScientists" },
    { DataHubRole.AuditUser, "SG-DataHub-Auditors" }
};

Layer 3: Data-Level Security (Row-Level & Column-Level)

-- SQL Server Row-Level Security in Synapse

CREATE SCHEMA Security;
GO

CREATE FUNCTION Security.fn_SecurityPredicate(@CustomerCountry NVARCHAR(2))
RETURNS TABLE
WITH SCHEMABINDING
AS
RETURN
    SELECT 1 AS Result
    WHERE
        -- Data Engineers sehen alles
        IS_MEMBER('SG-DataHub-Engineers') = 1
        OR
        -- Risk Manager nur eigene Region
        (IS_MEMBER('SG-Risk-Manager-DACH') = 1 AND @CustomerCountry IN ('DE', 'AT', 'CH'))
        OR
        (IS_MEMBER('SG-Risk-Manager-BENELUX') = 1 AND @CustomerCountry IN ('BE', 'NL', 'LU'))
        OR
        -- Sales nur eigene Kunden
        (IS_MEMBER('SG-Sales-Team') = 1 AND @CustomerCountry IN (
            SELECT Country FROM gold.DimCustomer
            WHERE SalesRepID = CAST(SESSION_CONTEXT(N'SalesRepID') AS INT)
        ));
GO

CREATE SECURITY POLICY Security.ContractSecurityPolicy
ADD FILTER PREDICATE Security.fn_SecurityPredicate(CustomerCountry)
ON gold.DimContract,
ADD BLOCK PREDICATE Security.fn_SecurityPredicate(CustomerCountry)
ON gold.DimContract AFTER INSERT;
GO

-- Column-Level Security: Masking sensibler Daten
ALTER TABLE gold.DimCustomer
ALTER COLUMN TaxID ADD MASKED WITH (FUNCTION = 'partial(2, "XXXX", 2)');
-- Ergebnis für Non-Privileged Users: "12XXXX89" statt "12345689"

Layer 4: Data Encryption

# Encryption at Rest and in Transit

Encryption at Rest:
  Azure Data Lake:
    - Microsoft-managed keys (Standard)
    - Customer-managed keys via Azure Key Vault (für Gold Layer)
  Azure Synapse:
    - Transparent Data Encryption (TDE) enabled
  Azure SQL:
    - Always Encrypted für hochsensible Spalten (z.B. IBAN)

Encryption in Transit:
  - TLS 1.2+ für alle Verbindungen
  - Azure Private Link für Service-to-Service Communication
  - VPN/ExpressRoute für On-Premise Konnektivität

Layer 5: Audit Logging

-- Audit Log Tabelle in Synapse
CREATE TABLE audit.DataAccessLog (
    LogID BIGINT IDENTITY(1,1) PRIMARY KEY,
    UserPrincipalName NVARCHAR(200),
    UserRole NVARCHAR(100),
    AccessedTable NVARCHAR(200),
    AccessType NVARCHAR(50), -- SELECT, INSERT, UPDATE, DELETE
    RowCount INT,
    QueryText NVARCHAR(MAX),
    ClientIPAddress NVARCHAR(50),
    ApplicationName NVARCHAR(200),
    Timestamp DATETIME2 DEFAULT GETUTCDATE()
);

-- Azure Monitor Integration
-- Alle Zugriffe auf Gold Layer werden geloggt und in Azure Sentinel analysiert
-- Alerts bei:
-- - Ungewöhnlich hohe Anzahl von Queries (mögliches Data Exfiltration)
-- - Zugriff außerhalb Arbeitszeiten
-- - Fehlgeschlagene Authentication-Versuche

Migration bestehender Data Warehouses

Viele Organisationen haben bereits ein Data Warehouse – oft On-Premise SQL Server, Oracle, oder ältere Cloud-Lösungen. Die Migration zu einem modernen Data Hub ist ein schrittweiser Prozess, kein Big-Bang.

Migrations-Strategie: Strangler Fig Pattern

Statt das alte DWH abzuschalten und alles neu zu bauen, migrieren wir inkrementell:

Phase 1: Parallel Betrieb
┌────────────┐         ┌────────────┐
│  Legacy    │         │  New Data  │
│  DWH       │────────→│  Hub       │
│  (On-Prem) │ Replika │  (Azure)   │
└────────────┘         └────────────┘
      ↓                      ↓
  Old Reports          New Reports
  (wird sukzessive migriert)

Phase 2: Hybrid (90% neuer Hub, 10% Legacy)
                    ┌────────────┐
                    │  New Data  │
  ┌────────────┐───→│  Hub       │
  │  Legacy    │    │  (Azure)   │
  │  DWH       │    └────────────┘
  │ (minimiert)│           ↓
  └────────────┘     Most Reports

Phase 3: Full Migration
                    ┌────────────┐
                    │  New Data  │
  [Legacy DWH]────→ │  Hub       │
   decommissioned   │  (Azure)   │
                    └────────────┘
                           ↓
                     All Reports

Migrations-Schritte

Schritt 1: Assessment

# Automated Analysis des bestehenden DWH

import pyodbc
import pandas as pd

# Connect to Legacy DWH
legacy_conn = pyodbc.connect('DSN=LegacyDWH;UID=user;PWD=pass')

# Analyse: Tabellen-Größen
tables_query = """
SELECT
    s.name AS SchemaName,
    t.name AS TableName,
    SUM(p.rows) AS RowCount,
    SUM(a.total_pages) * 8 / 1024 AS SizeMB,
    MAX(st.last_user_update) AS LastModified
FROM sys.tables t
INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
INNER JOIN sys.indexes i ON t.object_id = i.object_id
INNER JOIN sys.partitions p ON i.object_id = p.object_id AND i.index_id = p.index_id
INNER JOIN sys.allocation_units a ON p.partition_id = a.container_id
LEFT JOIN sys.dm_db_index_usage_stats st ON t.object_id = st.object_id
GROUP BY s.name, t.name
ORDER BY SizeMB DESC
"""

tables_df = pd.read_sql(tables_query, legacy_conn)

# Priorisierung
tables_df['Priority'] = tables_df.apply(lambda row:
    'HIGH' if row['RowCount'] > 1000000 or row['SizeMB'] > 1000 else
    'MEDIUM' if row['RowCount'] > 100000 else
    'LOW', axis=1
)

print(f"Total Tables: {len(tables_df)}")
print(f"Total Size: {tables_df['SizeMB'].sum():.2f} MB")
print(f"\nPriority Distribution:")
print(tables_df['Priority'].value_counts())

Schritt 2: Schema-Migration (DDL)

-- Automated Schema Conversion
-- Tool: Azure Database Migration Service (DMS) oder custom scripts

-- Beispiel: Legacy DWH Fact Table
-- Source (Legacy):
CREATE TABLE dbo.FactSales (
    SalesID INT PRIMARY KEY,
    DateKey INT,
    CustomerKey INT,
    Amount DECIMAL(18,2),
    Quantity INT,
    LoadDate DATETIME DEFAULT GETDATE()
);

-- Target (Synapse Dedicated SQL Pool):
CREATE TABLE gold.FactSales (
    SalesID INT NOT NULL,
    DateKey INT NOT NULL,
    CustomerKey INT NOT NULL,
    Amount DECIMAL(18,2),
    Quantity INT,
    LoadDate DATETIME2,

    -- Synapse-spezifische Optimierungen
    PRIMARY KEY NONCLUSTERED (SalesID) NOT ENFORCED
)
WITH (
    DISTRIBUTION = HASH(CustomerKey),  -- Optimiert für Customer-Joins
    CLUSTERED COLUMNSTORE INDEX
);

Schritt 3: Data Migration

# Azure Data Factory Pipeline für Data Migration

from azure.identity import DefaultAzureCredential
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *

credential = DefaultAzureCredential()
adf_client = DataFactoryManagementClient(credential, subscription_id)

# Create Pipeline für jede Tabelle
for table in tables_df[tables_df['Priority'] == 'HIGH'].itertuples():
    pipeline_name = f"Migrate_{table.SchemaName}_{table.TableName}"

    # Copy Activity
    copy_activity = CopyActivity(
        name=f"Copy_{table.TableName}",
        inputs=[
            DatasetReference(reference_name='LegacyDWH_Generic', parameters={
                'schemaName': table.SchemaName,
                'tableName': table.TableName
            })
        ],
        outputs=[
            DatasetReference(reference_name='Synapse_Generic', parameters={
                'schemaName': 'gold',
                'tableName': table.TableName
            })
        ],
        source=SqlSource(),
        sink=SqlDWSink(
            pre_copy_script=f"TRUNCATE TABLE gold.{table.TableName}",
            write_batch_size=10000,
            write_batch_timeout='00:10:00'
        ),
        enable_staging=True,  # Nutze Azure Blob als Staging
        staging_settings=StagingSettings(
            linked_service_name='AzureBlobStorage',
            path='staging'
        )
    )

    pipeline = PipelineResource(
        activities=[copy_activity]
    )

    adf_client.pipelines.create_or_update(
        resource_group_name,
        data_factory_name,
        pipeline_name,
        pipeline
    )

    print(f"Created pipeline: {pipeline_name}")

Schritt 4: Reconciliation & Validation

-- Data Validation: Row Counts müssen matched
SELECT
    'Legacy' AS Source,
    COUNT(*) AS RowCount,
    SUM(Amount) AS TotalAmount,
    MAX(LoadDate) AS LatestRecord
FROM LegacyDWH.dbo.FactSales

UNION ALL

SELECT
    'Synapse' AS Source,
    COUNT(*) AS RowCount,
    SUM(Amount) AS TotalAmount,
    MAX(LoadDate) AS LatestRecord
FROM gold.FactSales;

-- Expected: Row Counts und TotalAmount identisch

Schritt 5: Cutover & Decommissioning

Cutover-Checklist:
  ✓ Alle High-Priority Tables migriert und validiert
  ✓ Alle Legacy-Reports auf neuen Data Hub migriert
  ✓ Performance-Tests: Queries ≤ Legacy-Performance
  ✓ User-Acceptance-Tests durchgeführt
  ✓ Rollback-Plan dokumentiert
  ✓ Go-Live-Communication an Stakeholder

Post-Cutover:
  Week 1-2: Parallel Run (Legacy DWH read-only, neue Reports aktiv)
  Week 3-4: Monitoring, Hotfix-Phase
  Month 2: Legacy DWH Decommissioning
  Month 3: Hardware Return / Cost Savings realized

Migrations-Herausforderungen & Lessons Learned

Challenge 1: Query-Kompatibilität

  • Legacy T-SQL Queries nutzen oft Synapse-inkompatible Features
  • Solution: Query-Rewrite oder Compatibility Layer (Views)

Challenge 2: Performance-Regression

  • Einige Queries langsamer auf Synapse als On-Premise (Netzwerk-Latenz)
  • Solution: Materialized Views, Aggressive Caching, Query-Optimierung

Challenge 3: Change Management

  • User-Widerstand gegen "neue" Tools
  • Solution: Intensive Trainings, Champions-Program, Quick Wins zeigen

Challenge 4: Downtime-Fenster

  • 24/7-Betrieb erlaubt kein Wartungsfenster
  • Solution: Blue-Green-Deployment, schrittweise Migration

Fazit & Best Practices

Der Aufbau eines modernen Data Hubs für ein Leasing-Unternehmen ist ein strategisches, multi-dimensionales Projekt, das weit über die reine Technologie hinausgeht. Basierend auf der Erfahrung mit FinanceLease AG und ähnlichen Projekten hier die wichtigsten Erkenntnisse:

Architektur:

  1. Medallion-Architektur (Bronze-Silver-Gold) als bewährtes Pattern
  2. Delta Lake für ACID-Transaktionen und Time-Travel
  3. Synapse Dedicated SQL Pool für performantes BI-Reporting
  4. Separation of Concerns: Ingestion ≠ Processing ≠ Consumption

Datenqualität: 5. Data Quality ist kein einmaliges Projekt, sondern kontinuierlicher Prozess 6. Automatisierte Data Quality Checks in jeder Layer-Transformation 7. Data Lineage und Governance (Azure Purview) von Tag 1

Analytics & ML: 8. Data Hub als Fundament für Advanced Analytics, nicht nur Reporting 9. Feature Store für reproduzierbare ML-Features 10. MLOps-Praktiken: Versionierung, CI/CD für Models

Security: 11. Defense-in-Depth: Network, Identity, Data-Level Security 12. Encryption at Rest und in Transit ist Standard, kein Optional 13. Audit Logging für Compliance (DSGVO, BaFin)

Migration: 14. Strangler Fig Pattern statt Big-Bang 15. Automatisierte Assessment-Tools nutzen 16. Mindestens 20% Puffer für unerwartete Komplexität

Organisation: 17. Cross-funktionale Teams (Data Engineers, Analysts, Business) 18. Dedicated Data Product Owner mit Business-Background 19. Self-Service als Ziel, aber mit klaren Governance-Guard-Rails

Messbare Erfolge bei FinanceLease AG:

  • Time-to-Insight: 2 Wochen → 2 Stunden (95% Reduktion)
  • Data Quality: 78% → 96% korrekte Daten
  • Analytics-Adoption: 12% → 67% der Business-User nutzen aktiv Reports
  • Cost: €420K Legacy-DWH → €180K Cloud (57% Reduktion)
  • Business Impact: €4.5M zusätzlicher Revenue durch Churn-Prevention

Der moderne Data Hub ist kein Projekt, sondern ein Produkt – ein lebendes System, das kontinuierlich weiterentwickelt wird und messbaren Business-Value liefert.

← Zurück zu allen Publikationen