Enterprise Data Hub für Leasing-Unternehmen Beispiel: Von isolierten Datensilos zur integrierten Datenplattform
Enterprise Data Hub für Leasing-Unternehmen: Von isolierten Datensilos zur integrierten Datenplattform
Die Herausforderung: Datensilos in der Leasing-Branche
Leasing- und Finanzierungsunternehmen stehen vor einer besonderen Herausforderung: Ihre Geschäftsprozesse erzeugen Daten über zahlreiche, oft historisch gewachsene Systeme hinweg. Ein typisches mittelständisches Leasing-Unternehmen mit 500 Millionen Euro Leasingvolumen jongliert mit:
- Kernbanksystem für Vertragsmanagement und Buchhaltung (oft Legacy-Systeme wie SAP Banking oder proprietäre Software)
- CRM-System für Vertrieb und Kundenbeziehungen (Salesforce, Microsoft Dynamics)
- Dokumentenmanagementsystem für Verträge, Policen, Korrespondenz
- Risikomanagement-Tools für Bonitätsprüfung und Portfolio-Analysen
- Payment-Processing-Systeme für SEPA-Lastschriften und Zahlungsabwicklung
- Fleet-Management-Systeme (bei Fahrzeug-Leasing)
- Asset-Management-Systeme für Anlagegüter-Tracking
- Partner-Portale für Händler und Vertriebspartner
Jedes dieser Systeme produziert wertvolle Daten – aber in isolierten Silos, unterschiedlichen Formaten, mit inkonsistenten Definitionen und ohne zentrale Governance. Die Konsequenzen sind bekannt und schmerzhaft:
Business Impact:
- Entscheidungen ohne vollständiges Bild: Risk-Manager sehen Bonitätsdaten, aber nicht die aktuelle Zahlungshistorie aus dem Payment-System
- Reporting-Chaos: Monatliche Management-Reports erfordern manuelle Excel-Konsolidierung aus 8+ Systemen
- Missed Opportunities: Cross-Selling-Potenziale bleiben unentdeckt, weil Vertriebs- und Vertragsdaten nicht verknüpft sind
- Compliance-Risiken: IFRS 16 Reporting, BaFin-Meldungen und DSGVO-Anfragen dauern Wochen statt Stunden
- Ineffizienz: Data Analysts verbringen 70% ihrer Zeit mit Datenaufbereitung statt mit Analysen
Technische Schulden:
- Point-to-Point-Integrationen führen zu einem unmaintainbaren Integrations-Spaghetti
- Duplikate und Inkonsistenzen: Ist "Müller GmbH" gleich "Mueller GmbH" gleich "Müller Holding"?
- Keine Single Source of Truth für Stammdaten
- Fehlende Historisierung: "Was war der Vertragsstand vor 6 Monaten?" ist unbeantwortbar
Die Lösung: Ein moderner Enterprise Data Hub, der als zentrale Datenplattform fungiert und alle relevanten Daten konsolidiert, standardisiert, anreichert und für verschiedene Konsumenten (BI-Tools, Data Science, APIs) bereitstellt.
Dieser Artikel dokumentiert den Aufbau eines solchen Data Hubs für ein fiktives, aber realistisches Leasing-Unternehmen "FinanceLease AG" – mit 800 Mitarbeitern, 2 Milliarden Euro Leasingvolumen, 45.000 aktiven Verträgen und Geschäftsbereichen in Fahrzeug-Leasing, Equipment-Leasing und Mietkauf. Die beschriebene Architektur, Technologien und Patterns sind direkt auf reale Projekte übertragbar.
Datenquellen: Das heterogene Quellsystem-Ökosystem
Übersicht der Datenquellen bei FinanceLease AG
Ein Data Hub ist nur so gut wie seine Fähigkeit, Daten aus verschiedensten Quellen zu integrieren. Bei FinanceLease AG haben wir folgende Quellsysteme identifiziert:
1. Core Banking System (SAP Banking / Finnova)
Typ: On-Premise SQL Server Database (Legacy) Datenvolumen: 500 GB, 200 Mio. Zeilen über alle Tabellen Kritische Daten:
- Vertragsdetails (Vertrags-Nr., Laufzeit, Raten, Konditionen)
- Kundenstammdaten (Firmendaten, Adressen, Kontaktpersonen)
- Buchungsdaten (Zahlungseingänge, Forderungen, Mahnungen)
- Asset-Informationen (Fahrzeuge, Maschinen, Equipment-Details)
- Versicherungsdaten
Technische Charakteristika:
- Keine native API, nur direkter DB-Zugriff möglich
- Komplexes, denormalisiertes Schema (über 1.200 Tabellen)
- Keine Change Data Capture (CDC) – nur Timestamps für Updates
- Geschäftskritisch: 24/7 Betrieb, Lesezugriffe müssen Performance-neutral sein
Extraktionsstrategie:
-- Incremental Load Beispiel: Neue/geänderte Verträge seit letztem Load
SELECT
VertragID,
KundenID,
Vertragsnummer,
Leasingobjekt,
Leasingrate,
Laufzeit,
Startdatum,
Enddatum,
Status,
LastModifiedDate
FROM Vertraege
WHERE LastModifiedDate > @LastExtractTimestamp
OR CreatedDate > @LastExtractTimestamp
Herausforderungen:
- Legacy-Encoding (ISO-8859-1 statt UTF-8) → Umlaute-Probleme
- Inkonsistente Löschlogik: Soft-Deletes nicht einheitlich implementiert
- Historisierung fehlt: Vertrags-Änderungen überschreiben alte Werte
- Performance: Full-Table-Scans bei fehlenden Indizes
2. CRM-System (Salesforce)
Typ: Cloud SaaS, REST API Datenvolumen: 120.000 Leads, 35.000 Opportunities, 50.000 Accounts Kritische Daten:
- Sales Pipeline (Opportunities, Stages, Forecast)
- Lead-Daten und Konversionstracking
- Account-Hierarchien (Konzernstrukturen)
- Aktivitäten (Calls, Meetings, E-Mails)
- Custom Objects für Leasing-spezifische Workflows
Technische Charakteristika:
- Salesforce REST API v58.0
- Bulk API für große Datenmengen (> 10.000 Records)
- API-Limits: 100.000 Calls/24h (wird bei großen Extracts schnell kritisch)
- Webhook-Support für Echtzeit-Notifikationen
Extraktionsstrategie:
# Salesforce Bulk API Extraktion
from simple_salesforce import Salesforce, SalesforceBulk
sf = Salesforce(username=SF_USER, password=SF_PASS, security_token=SF_TOKEN)
bulk = SalesforceBulk(sessionId=sf.session_id, host=sf.sf_instance)
# Bulk Query für große Datenmengen
query = """
SELECT Id, Name, Amount, StageName, CloseDate, LeaseType__c,
ContractTerm__c, LastModifiedDate
FROM Opportunity
WHERE LastModifiedDate > LAST_N_DAYS:1
"""
job = bulk.create_query_job("Opportunity", contentType='JSON')
batch = bulk.query(job, query)
bulk.close_job(job)
# Ergebnisse abholen
results = bulk.get_all_results_for_query_batch(batch)
opportunities = [record for result in results for record in result]
Herausforderungen:
- API-Rate-Limiting erfordert intelligente Batch-Steuerung
- Custom Fields folgen Naming Convention:
FieldName__c - Lookup-Relationships müssen aufgelöst werden (z.B. Account-Hierarchien)
- Zeitzone-Handling: Salesforce UTC, Core Banking CET
3. Payment Processing System (SEPA Gateway)
Typ: REST API + SFTP für Batch-Files Datenvolumen: 1.5 Mio. Transaktionen/Jahr Kritische Daten:
- SEPA-Lastschriften (mandats, status, rejection reasons)
- Zahlungseingänge und -ausgänge
- Bank-Connection-Status
- Return-Debit-Informationen
Technische Charakteristika:
- Real-time API für Transaktions-Status
- Täglich SFTP-Upload: PAIN.008 XML (SEPA Lastschrift-Einreichung)
- Täglich SFTP-Download: CAMT.053 XML (Kontoauszüge)
- Webhook für Payment-Status-Changes
Extraktionsstrategie:
// CAMT.053 XML Parsing (ISO 20022 Standard)
public class CAMT053Parser
{
public List<PaymentTransaction> ParseBankStatement(string xmlContent)
{
var doc = XDocument.Parse(xmlContent);
XNamespace ns = "urn:iso:std:iso:20022:tech:xsd:camt.053.001.02";
var transactions = doc.Descendants(ns + "Ntry")
.Select(entry => new PaymentTransaction
{
BookingDate = DateTime.Parse(entry.Element(ns + "BookgDt")
?.Element(ns + "Dt")?.Value),
Amount = decimal.Parse(entry.Element(ns + "Amt")?.Value),
Currency = entry.Element(ns + "Amt")?.Attribute("Ccy")?.Value,
DebtorName = entry.Descendants(ns + "Dbtr")
.FirstOrDefault()?.Element(ns + "Nm")?.Value,
DebtorIBAN = entry.Descendants(ns + "DbtrAcct")
.FirstOrDefault()?.Element(ns + "Id")
?.Element(ns + "IBAN")?.Value,
Reference = entry.Descendants(ns + "RmtInf")
.FirstOrDefault()?.Element(ns + "Ustrd")?.Value,
Status = MapBookingStatus(entry.Element(ns + "Sts")?.Value)
})
.ToList();
return transactions;
}
}
Herausforderungen:
- XML-Parsing von ISO 20022 Standards (komplex, verschachtelt)
- Reconciliation: Matching von SEPA-Lastschriften zu Verträgen via Referenz
- Late Rejections: Rückläufer können bis zu 8 Wochen nach Einzug kommen
- Duplicate Detection: Wiederholte Downloads derselben Dateien
4. Fleet Management System (Eigensystem)
Typ: PostgreSQL Database + REST API Datenvolumen: 15.000 Fahrzeuge, 2 Mio. Telematik-Events/Tag Kritische Daten:
- Fahrzeugstammdaten (Marke, Modell, VIN, Erstzulassung)
- Standortdaten (GPS-Tracking)
- Kilometer-Stände
- Wartungs- und Reparaturhistorie
- Schaden-Meldungen
Technische Charakteristika:
- PostgreSQL 15 mit PostGIS Extension (Geo-Daten)
- REST API für CRUD-Operationen
- Event-Stream via Apache Kafka für Telematik-Daten (Echtzeit)
- S3-Bucket für Fahrzeug-Bilder und Schadenfotos
Extraktionsstrategie:
# Kafka Consumer für Telematik-Stream
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'vehicle-telemetry',
bootstrap_servers=['kafka-broker:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='data-hub-consumer'
)
for message in consumer:
telemetry_event = message.value
# {
# "vehicle_id": "VIN12345",
# "timestamp": "2025-01-15T10:30:00Z",
# "latitude": 52.520008,
# "longitude": 13.404954,
# "mileage": 45230,
# "fuel_level": 45.5,
# "battery_voltage": 12.6
# }
# Stream to Data Hub (z.B. Azure Event Hub oder Kafka Topic)
send_to_data_hub(telemetry_event)
Herausforderungen:
- Hohe Event-Frequenz: 2 Mio. Events/Tag = 23 Events/Sekunde durchschnittlich
- Geo-Daten erfordern spezielle Handling (nicht standard SQL)
- Image-Daten: Referenzen vs. Blob-Storage-Integration
5. Externe Datenquellen
Bonitätsdaten (SCHUFA API):
- REST API, Rate-Limit: 1000 Calls/Stunde
- On-Demand-Abfragen bei Neuverträgen
- Historische Scores werden im Data Hub persistiert
Fahrzeugbewertungen (DAT/Schwacke API):
- REST API für Restwert-Prognosen
- Batch-Updates: Monatliche Neubewertung des gesamten Portfolios
- CSV-Export-Option für Bulk-Daten
Makroökonomische Daten (Bundesbank API):
- Zinssätze, Inflationsraten für Risk-Modelle
- Open Data, keine Authentication
- Täglich aktualisierte Zeitreihen
Datenquellen-Übersicht: Zusammenfassung
| Quelle | Typ | Volumen | Update-Frequenz | Zugriffsmethode | Kritikalität |
|---|---|---|---|---|---|
| Core Banking | On-Prem DB | 500 GB | Real-time | DB Query | Critical |
| CRM (Salesforce) | Cloud API | 50K Accounts | Real-time | REST API | High |
| Payment Processing | Hybrid | 1.5M tx/Jahr | Täglich + RT | SFTP + API | Critical |
| Fleet Management | On-Prem DB | 15K Vehicles | Real-time Stream | Kafka + API | High |
| SCHUFA | External API | On-Demand | On-Demand | REST API | High |
| DAT/Schwacke | External API | Batch | Monatlich | REST API | Medium |
| Bundesbank | Open Data | Zeitreihen | Täglich | REST API | Low |
Wichtige Erkenntnisse für die Data-Hub-Architektur:
- Heterogenität: Mix aus Legacy-Datenbanken, moderne Cloud-APIs, File-Drops und Event-Streams
- Latenz-Anforderungen: Von Echtzeit (Telematik) bis Batch (Bewertungen)
- Volumen-Varianz: Von wenigen KB (Makrodaten) bis GB-Range (Core Banking)
- Kritikalität: Payment und Core Banking sind geschäftskritisch → Fehlertoleranz essentiell
- Governance: Externe APIs haben Rate-Limits und Kosten → intelligente Caching-Strategie nötig
Zentrale Data Hub Komponenten: Die Architektur
Ein moderner Data Hub für ein Leasing-Unternehmen folgt einer mehrschichtigen Architektur, die verschiedene Datenverarbeitungs-Paradigmen unterstützt: Batch, Streaming, und On-Demand-Processing. Die Architektur orientiert sich an der Medallion-Architektur (Bronze-Silver-Gold), die sich in Data-Lake-Szenarien bewährt hat.
High-Level Architekturübersicht
┌─────────────────────────────────────────────────────────────────────────┐
│ DATA SOURCES │
│ Core Banking │ Salesforce │ Payment │ Fleet Mgmt │ External APIs │
└─────────────┬───────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ INGESTION LAYER │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Azure Data │ │ Azure Event │ │ Azure │ │
│ │ Factory │ │ Hub │ │ Functions │ │
│ │ (Batch/CDC) │ │ (Streaming) │ │ (API Pulls) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────┬───────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ BRONZE LAYER (RAW DATA) │
│ Azure Data Lake Gen2 / Delta Lake │
│ • Raw data, as-is from sources │
│ • Partitioned by source_system / ingestion_date │
│ • Immutable, append-only │
└─────────────┬───────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ PROCESSING LAYER (ETL/ELT) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Azure │ │ Databricks │ │ Azure │ │
│ │ Synapse │ │ Spark │ │ Stream │ │
│ │ Pipelines │ │ (Complex) │ │ Analytics │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────┬───────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ SILVER LAYER (CLEANED & VALIDATED) │
│ Azure Data Lake Gen2 / Delta Lake │
│ • Cleaned, validated, standardized │
│ • Schema enforcement │
│ • De-duplication, data quality checks │
└─────────────┬───────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ GOLD LAYER (BUSINESS-LEVEL AGGREGATES) │
│ Azure Synapse Dedicated SQL Pool / Delta Lake │
│ • Business entities (Customer 360°, Contract View) │
│ • Pre-aggregated KPIs │
│ • Dimensional models (Star Schema) │
└─────────────┬───────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ CONSUMPTION LAYER │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Power BI│ │ Azure ML│ │ REST API│ │ External│ │
│ │ Reports │ │ Models │ │ (Apps) │ │ Tools │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
Cross-Cutting Concerns:
• Azure Purview (Data Governance, Catalog)
• Azure Key Vault (Secrets Management)
• Azure Monitor + App Insights (Monitoring)
• Azure Active Directory (Authentication/Authorization)
Komponente 1: Ingestion Layer
Technologie-Stack:
- Azure Data Factory für Batch-Ingestion (Datenbanken, APIs mit Bulk-Export)
- Azure Event Hub für Streaming-Ingestion (Kafka-kompatibel, Telematik-Daten)
- Azure Functions für On-Demand API-Calls (z.B. SCHUFA bei Vertragsabschluss)
Azure Data Factory Pipeline Beispiel:
{
"name": "IngestCoreBankingContracts",
"properties": {
"activities": [
{
"name": "CopyContractsIncremental",
"type": "Copy",
"inputs": [
{
"referenceName": "SourceSQLServer",
"type": "DatasetReference",
"parameters": {
"tableName": "Vertraege",
"watermarkColumn": "LastModifiedDate",
"watermarkValue": "@pipeline().parameters.LastWatermark"
}
}
],
"outputs": [
{
"referenceName": "BronzeDataLake",
"type": "DatasetReference",
"parameters": {
"folderPath": "bronze/core_banking/contracts/@{formatDateTime(utcnow(),'yyyy/MM/dd')}",
"fileName": "contracts_@{formatDateTime(utcnow(),'yyyyMMdd_HHmmss')}.parquet"
}
}
],
"typeProperties": {
"source": {
"type": "SqlServerSource",
"sqlReaderQuery": {
"value": "SELECT * FROM Vertraege WHERE LastModifiedDate > '@{pipeline().parameters.LastWatermark}'",
"type": "Expression"
}
},
"sink": {
"type": "ParquetSink",
"storeSettings": {
"type": "AzureBlobFSWriteSettings"
},
"formatSettings": {
"type": "ParquetWriteSettings",
"compressionCodec": "snappy"
}
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"mappings": [
{
"source": { "name": "VertragID" },
"sink": { "name": "contract_id", "type": "Int64" }
},
{
"source": { "name": "Vertragsnummer" },
"sink": { "name": "contract_number", "type": "String" }
}
]
}
}
},
{
"name": "UpdateWatermark",
"type": "SqlServerStoredProcedure",
"dependsOn": [
{
"activity": "CopyContractsIncremental",
"dependencyConditions": [ "Succeeded" ]
}
],
"typeProperties": {
"storedProcedureName": "usp_UpdateWatermark",
"storedProcedureParameters": {
"TableName": { "value": "Vertraege" },
"WatermarkValue": { "value": "@{utcnow()}" }
}
}
}
],
"parameters": {
"LastWatermark": {
"type": "String",
"defaultValue": "1900-01-01T00:00:00Z"
}
}
}
}
Design Principles:
- Idempotenz: Wiederholte Ausführung derselben Pipeline mit denselben Parametern produziert identische Ergebnisse
- Watermarking: Incremental Load via Timestamps vermeidet Full-Table-Scans
- Partitionierung: Daten nach Datum partitioniert für effiziente Queries und Retention-Management
- Fehlertoleranz: Bei Fehler wird Wassermarke nicht aktualisiert → nächster Lauf holt fehlende Daten nach
Komponente 2: Bronze Layer (Raw Data)
Technologie: Azure Data Lake Storage Gen2 mit Delta Lake Format
Charakteristika:
- Immutable: Daten werden nie verändert, nur hinzugefügt (Append-Only)
- Schema-on-Read: Flexibles Schema, keine Enforcement
- Vollständige Historie: Alle ingested Daten bleiben erhalten (subject to retention policy)
- Partitionierung: Nach
source_system,ingestion_date
Beispiel-Struktur:
bronze/
├── core_banking/
│ ├── contracts/
│ │ ├── year=2025/
│ │ │ ├── month=01/
│ │ │ │ ├── day=15/
│ │ │ │ │ ├── contracts_20250115_080000.parquet
│ │ │ │ │ └── contracts_20250115_200000.parquet
│ ├── customers/
│ ├── payments/
├── salesforce/
│ ├── opportunities/
│ ├── accounts/
│ ├── leads/
├── fleet_management/
│ ├── vehicles/
│ ├── telemetry/ (partitioned by hour for high-frequency data)
Delta Lake Beispiel:
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("BronzeIngestion") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Read from source (z.B. Kafka)
raw_df = spark.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-broker:9092") \
.option("subscribe", "vehicle-telemetry") \
.load()
# Schema-Definition (optional, für Validierung)
from pyspark.sql.types import *
schema = StructType([
StructField("vehicle_id", StringType(), False),
StructField("timestamp", TimestampType(), False),
StructField("latitude", DoubleType(), True),
StructField("longitude", DoubleType(), True),
StructField("mileage", IntegerType(), True),
StructField("fuel_level", DoubleType(), True)
])
# Parse JSON
from pyspark.sql.functions import from_json, col, current_timestamp
telemetry_df = raw_df.select(
from_json(col("value").cast("string"), schema).alias("data"),
col("timestamp").alias("kafka_timestamp")
).select("data.*", "kafka_timestamp") \
.withColumn("ingestion_timestamp", current_timestamp())
# Write to Bronze (Delta Lake)
telemetry_df.write \
.format("delta") \
.mode("append") \
.partitionBy("year", "month", "day", "hour") \
.save("abfss://bronze@datalake.dfs.core.windows.net/fleet_management/telemetry")
Komponente 3: Silver Layer (Cleaned & Validated)
Technologie: Delta Lake + Azure Databricks für komplexe Transformationen
Transformationen:
Data Quality Checks
- Null-Checks für Pflichtfelder
- Range-Validierung (z.B. Leasingrate > 0)
- Referential Integrity (z.B. existiert KundenID in Kundenstamm?)
Standardisierung
- Einheitliche Datumsformate (ISO 8601)
- Währungskonvertierung (alle Beträge in EUR)
- Adress-Normalisierung
De-Duplication
- Window-Functions für Duplikat-Erkennung
- Merge-Logik für "Same Contract, Multiple Sources"
Enrichment
- Lookup zu Referenzdaten
- Berechnete Felder (z.B. "Days Until Contract End")
Beispiel: Contract Silver Layer Processing
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.tables import DeltaTable
# Read Bronze
bronze_contracts = spark.read \
.format("delta") \
.load("abfss://bronze@datalake.dfs.core.windows.net/core_banking/contracts")
# Data Quality: Filter out invalid records
valid_contracts = bronze_contracts.filter(
(F.col("contract_number").isNotNull()) &
(F.col("contract_number") != "") &
(F.col("lease_rate") > 0) &
(F.col("start_date") <= F.col("end_date"))
)
# De-Duplication: Nehme neuesten Record pro contract_number
window_spec = Window.partitionBy("contract_number").orderBy(F.desc("LastModifiedDate"))
deduped_contracts = valid_contracts.withColumn("row_num", F.row_number().over(window_spec)) \
.filter(F.col("row_num") == 1) \
.drop("row_num")
# Standardization
standardized_contracts = deduped_contracts \
.withColumn("start_date", F.to_date("start_date", "yyyy-MM-dd")) \
.withColumn("end_date", F.to_date("end_date", "yyyy-MM-dd")) \
.withColumn("lease_rate", F.round("lease_rate", 2)) \
.withColumn("currency", F.lit("EUR"))
# Enrichment: Calculate derived fields
enriched_contracts = standardized_contracts \
.withColumn("contract_duration_months",
F.months_between("end_date", "start_date")) \
.withColumn("days_until_end",
F.datediff("end_date", F.current_date())) \
.withColumn("contract_status",
F.when(F.col("days_until_end") < 0, "EXPIRED")
.when(F.col("days_until_end") <= 90, "ENDING_SOON")
.otherwise("ACTIVE"))
# Write to Silver (with MERGE for upserts)
silver_path = "abfss://silver@datalake.dfs.core.windows.net/contracts"
if DeltaTable.isDeltaTable(spark, silver_path):
silver_table = DeltaTable.forPath(spark, silver_path)
silver_table.alias("target").merge(
enriched_contracts.alias("source"),
"target.contract_id = source.contract_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
else:
enriched_contracts.write \
.format("delta") \
.mode("overwrite") \
.save(silver_path)
# Data Quality Metrics
dq_metrics = {
"total_bronze_records": bronze_contracts.count(),
"valid_records": valid_contracts.count(),
"invalid_records": bronze_contracts.count() - valid_contracts.count(),
"duplicates_removed": valid_contracts.count() - deduped_contracts.count(),
"final_silver_records": enriched_contracts.count()
}
# Log metrics
print(f"Data Quality Metrics: {dq_metrics}")
Komponente 4: Gold Layer (Business-Level)
Technologie: Azure Synapse Dedicated SQL Pool + Delta Lake
Charakteristika:
- Business Entities: Nicht technische Tabellen, sondern Business-Konzepte
- Dimensional Modeling: Star Schema für optimale BI-Performance
- Pre-Aggregated KPIs: Für schnelle Dashboard-Performance
- Historisierung: Slowly Changing Dimensions (SCD Type 2)
Beispiel: Customer 360° View
-- Gold Layer: Customer 360° Dimensional Model
-- Dimension: Customer
CREATE TABLE gold.DimCustomer (
CustomerKey INT IDENTITY(1,1) PRIMARY KEY,
CustomerID VARCHAR(50) NOT NULL,
CustomerNumber VARCHAR(50),
CompanyName NVARCHAR(200),
Industry NVARCHAR(100),
EmployeeCount INT,
AnnualRevenue DECIMAL(18,2),
CreditRating VARCHAR(10),
-- SCD Type 2 fields
EffectiveFrom DATE NOT NULL,
EffectiveTo DATE,
IsCurrent BIT NOT NULL DEFAULT 1,
-- Audit
CreatedDate DATETIME2 DEFAULT GETUTCDATE(),
ModifiedDate DATETIME2 DEFAULT GETUTCDATE()
);
-- Dimension: Contract
CREATE TABLE gold.DimContract (
ContractKey INT IDENTITY(1,1) PRIMARY KEY,
ContractID VARCHAR(50) NOT NULL,
ContractNumber VARCHAR(50) NOT NULL,
ContractType NVARCHAR(50), -- 'Leasing', 'Mietkauf'
AssetType NVARCHAR(50), -- 'Vehicle', 'Equipment'
AssetDescription NVARCHAR(500),
LeaseRate DECIMAL(18,2),
ContractDurationMonths INT,
StartDate DATE,
EndDate DATE,
Status NVARCHAR(50), -- 'ACTIVE', 'ENDING_SOON', 'EXPIRED'
-- Audit
CreatedDate DATETIME2 DEFAULT GETUTCDATE(),
ModifiedDate DATETIME2 DEFAULT GETUTCDATE()
);
-- Fact: Contract Transactions
CREATE TABLE gold.FactContractTransaction (
TransactionKey BIGINT IDENTITY(1,1) PRIMARY KEY,
DateKey INT NOT NULL, -- FK to DimDate
CustomerKey INT NOT NULL, -- FK to DimCustomer
ContractKey INT NOT NULL, -- FK to DimContract
TransactionType NVARCHAR(50), -- 'PAYMENT', 'INVOICE', 'ADJUSTMENT'
Amount DECIMAL(18,2),
Currency CHAR(3),
PaymentMethod NVARCHAR(50),
PaymentStatus NVARCHAR(50),
-- Metrics
DaysOverdue INT,
LateFeeAmount DECIMAL(18,2),
TransactionDate DATE,
CreatedDate DATETIME2 DEFAULT GETUTCDATE()
);
-- Aggregated KPI Table
CREATE TABLE gold.ContractKPIs (
SnapshotDate DATE NOT NULL,
ContractType NVARCHAR(50),
TotalActiveContracts INT,
TotalContractValue DECIMAL(18,2),
AverageLeaseRate DECIMAL(18,2),
PaymentsOnTime INT,
PaymentsOverdue INT,
PaymentSuccessRate DECIMAL(5,2),
AverageContractDuration INT,
ContractsEndingNext90Days INT,
PRIMARY KEY (SnapshotDate, ContractType)
);
Materialized View für Customer 360°:
CREATE VIEW gold.vw_Customer360 AS
SELECT
c.CustomerKey,
c.CompanyName,
c.Industry,
c.CreditRating,
-- Contract Metrics
COUNT(DISTINCT ct.ContractKey) AS TotalContracts,
SUM(CASE WHEN ct.Status = 'ACTIVE' THEN 1 ELSE 0 END) AS ActiveContracts,
SUM(ct.LeaseRate) AS TotalMonthlyLeaseAmount,
-- Payment Metrics
SUM(CASE WHEN ft.PaymentStatus = 'PAID_ONTIME' THEN 1 ELSE 0 END) AS PaymentsOnTime,
SUM(CASE WHEN ft.PaymentStatus = 'OVERDUE' THEN 1 ELSE 0 END) AS PaymentsOverdue,
SUM(ft.LateFeeAmount) AS TotalLateFees,
-- Salesforce Metrics
COUNT(DISTINCT opp.OpportunityID) AS TotalOpportunities,
SUM(opp.Amount) AS PipelineValue,
-- Risk Indicators
MAX(c.CreditRating) AS CurrentCreditRating,
AVG(CAST(ft.DaysOverdue AS FLOAT)) AS AvgDaysOverdue,
-- Last Activity
MAX(ft.TransactionDate) AS LastPaymentDate,
MAX(opp.LastActivityDate) AS LastSalesActivity
FROM gold.DimCustomer c
LEFT JOIN gold.DimContract ct ON c.CustomerID = ct.CustomerID
LEFT JOIN gold.FactContractTransaction ft ON ct.ContractKey = ft.ContractKey
LEFT JOIN gold.DimOpportunity opp ON c.CustomerID = opp.AccountID
WHERE c.IsCurrent = 1 -- Only current customer version (SCD Type 2)
GROUP BY
c.CustomerKey,
c.CompanyName,
c.Industry,
c.CreditRating;
Komponente 5: Metadata & Governance (Azure Purview)
Azure Purview fungiert als zentraler Data Catalog und Governance-Layer:
- Data Discovery: Durchsuchbarer Katalog aller Datasets im Data Hub
- Lineage Tracking: "Wo kommt dieser Wert her?" – vom Quellsystem bis zum Report
- Data Classification: Automatische Erkennung von PII, Finanzdaten, etc.
- Business Glossary: Zentrale Definitionen (z.B. "Was ist 'Leasingrate'?")
Beispiel: Lineage für "Total Contract Value" in Power BI Report
Power BI Report: "Management Dashboard"
↓ Measures: [Total Contract Value]
↓ Source: Gold.ContractKPIs.TotalContractValue
↓ Calculated in: gold_kpi_aggregation_pipeline
↓ Source: Silver.Contracts.LeaseRate
↓ Transformation: silver_contract_enrichment
↓ Source: Bronze.CoreBanking.Contracts.Leasingrate
↓ Ingestion: adf_core_banking_contracts_pipeline
↓ Original Source: CoreBankingDB.dbo.Vertraege.Leasingrate
Diese Transparenz ist kritisch für:
- Compliance: DSGVO-Anfragen (Wo sind Daten der Person X?)
- Impact Analysis: "Wenn ich Tabelle Y ändere, welche Reports brechen?"
- Trust: Business-User sehen, wo Zahlen herkommen
Modern Reporting mit Power BI: Von Daten zu Insights
Power BI ist die primäre Konsumenten-Schicht für Business-User bei FinanceLease AG. Die Integration mit dem Azure-basierten Data Hub ermöglicht performantes, self-service-fähiges Reporting mit Echtzeit-Updates.
Architektur: Power BI ↔ Data Hub Integration
┌──────────────────────────────────────────────────────────────────┐
│ POWER BI SERVICE │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Management │ │ Risk & │ │ Operations │ │
│ │ Dashboard │ │ Compliance │ │ Dashboard │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Sales │ │ Finance │ │ Fleet │ │
│ │ Analytics │ │ KPIs │ │ Management │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────┬────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────┐
│ DATA CONNECTION OPTIONS │
│ │
│ Option 1: DirectQuery (Echtzeit, limitierte Transformationen) │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Azure Synapse SQL Pool (Gold Layer) │ │
│ │ • Sub-second query performance │ │
│ │ • Always up-to-date │ │
│ │ • Row-Level Security enforced in DB │ │
│ └─────────────────────────────────────────────────┘ │
│ │
│ Option 2: Import (Fast, full DAX, scheduled refresh) │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Azure Synapse / Pre-aggregated Tables │ │
│ │ • In-memory compression │ │
│ │ • Complex DAX calculations │ │
│ │ • Scheduled refresh (hourly/daily) │ │
│ └─────────────────────────────────────────────────┘ │
│ │
│ Option 3: Composite (Best of both) │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Import for Dimensions, DirectQuery for Facts │ │
│ │ • Performance + Freshness │ │
│ └─────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────┘
Beispiel 1: Management Dashboard
Use Case: C-Level Executives benötigen täglich aktuelle KPIs zu Portfolio-Performance, Risk-Metriken, und Sales-Pipeline.
Power BI Dataset Configuration:
// Measures.dax - Zentrale Business-Metriken
// ==================== Portfolio Metrics ====================
[Total Active Contracts] :=
CALCULATE(
COUNTROWS(DimContract),
DimContract[Status] = "ACTIVE"
)
[Total Contract Value] :=
CALCULATE(
SUM(DimContract[LeaseRate]) * [Average Contract Duration],
DimContract[Status] IN {"ACTIVE", "ENDING_SOON"}
)
[Portfolio Growth YoY] :=
VAR CurrentYear = [Total Contract Value]
VAR PreviousYear =
CALCULATE(
[Total Contract Value],
SAMEPERIODLASTYEAR(DimDate[Date])
)
RETURN
DIVIDE(CurrentYear - PreviousYear, PreviousYear, 0)
// ==================== Payment Performance ====================
[Payment Success Rate] :=
VAR OnTimePayments =
CALCULATE(
COUNTROWS(FactContractTransaction),
FactContractTransaction[PaymentStatus] = "PAID_ONTIME"
)
VAR TotalPayments = COUNTROWS(FactContractTransaction)
RETURN
DIVIDE(OnTimePayments, TotalPayments, 0)
[Overdue Amount] :=
CALCULATE(
SUM(FactContractTransaction[Amount]),
FactContractTransaction[PaymentStatus] = "OVERDUE"
)
[Days Sales Outstanding (DSO)] :=
VAR AvgDailyRevenue =
CALCULATE(
[Total Contract Value] / 365,
DimContract[Status] = "ACTIVE"
)
VAR OutstandingReceivables =
CALCULATE(
SUM(FactContractTransaction[Amount]),
FactContractTransaction[PaymentStatus] IN {"PENDING", "OVERDUE"}
)
RETURN
DIVIDE(OutstandingReceivables, AvgDailyRevenue, 0)
// ==================== Risk Indicators ====================
[High-Risk Contracts %] :=
VAR HighRiskContracts =
CALCULATE(
COUNTROWS(DimContract),
DimContract[Status] = "ACTIVE",
RELATED(DimCustomer[CreditRating]) IN {"D", "E"}
)
VAR TotalActiveContracts = [Total Active Contracts]
RETURN
DIVIDE(HighRiskContracts, TotalActiveContracts, 0)
[Expected Loss] :=
// Simplified Expected Loss Calculation
VAR ExposureAtDefault =
SUMX(
FILTER(DimContract, DimContract[Status] = "ACTIVE"),
DimContract[LeaseRate] * DimContract[ContractDurationMonths]
)
VAR ProbabilityOfDefault = [High-Risk Contracts %] * 0.15 // Simplified
VAR LossGivenDefault = 0.45 // Industry standard
RETURN
ExposureAtDefault * ProbabilityOfDefault * LossGivenDefault
// ==================== Sales Pipeline ====================
[Total Pipeline Value] :=
CALCULATE(
SUM(DimOpportunity[Amount]),
DimOpportunity[Stage] <> "Closed Won",
DimOpportunity[Stage] <> "Closed Lost"
)
[Win Rate] :=
VAR WonOpportunities =
CALCULATE(
COUNTROWS(DimOpportunity),
DimOpportunity[Stage] = "Closed Won"
)
VAR TotalClosedOpportunities =
CALCULATE(
COUNTROWS(DimOpportunity),
DimOpportunity[Stage] IN {"Closed Won", "Closed Lost"}
)
RETURN
DIVIDE(WonOpportunities, TotalClosedOpportunities, 0)
[Average Deal Size] :=
CALCULATE(
AVERAGE(DimOpportunity[Amount]),
DimOpportunity[Stage] = "Closed Won"
)
// ==================== Trend Indicators ====================
[Contract Value Trend] :=
VAR CurrentValue = [Total Contract Value]
VAR PreviousMonthValue =
CALCULATE(
[Total Contract Value],
DATEADD(DimDate[Date], -1, MONTH)
)
RETURN
IF(
CurrentValue > PreviousMonthValue, "▲",
IF(CurrentValue < PreviousMonthValue, "▼", "→")
)
Dashboard Layout:
┌────────────────────────────────────────────────────────────────────┐
│ FINANCELEASE AG - MANAGEMENT DASHBOARD 🔄 Last: 08:30 UTC │
├────────────────────────────────────────────────────────────────────┤
│ │
│ 📊 KEY METRICS │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌───────────┐│
│ │ Total │ │ Contract │ │ Payment │ │ Portfolio │││
│ │ Contracts │ │ Value │ │ Success │ │ Growth │││
│ │ │ │ │ │ Rate │ │ YoY │││
│ │ 45,234 ▲ │ │ €2.1B ▲ │ │ 96.5% ▼ │ │ +12.3% ▲││
│ └─────────────┘ └─────────────┘ └─────────────┘ └───────────┘│
│ │
│ 📈 PORTFOLIO COMPOSITION │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ [Stacked Bar Chart: Contract Value by Type & Month] │ │
│ │ ▓▓▓▓▓▓▓ Vehicle Leasing (60%) │ │
│ │ ▓▓▓▓ Equipment Leasing (25%) │ │
│ │ ▓▓ Mietkauf (15%) │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
│ 💰 PAYMENT PERFORMANCE │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ [Line Chart: Payment Success Rate Trend] │ │
│ │ Last 12 Months: Min 94.2%, Max 97.8%, Avg 96.1% │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
│ ⚠️ RISK INDICATORS │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ High-Risk │ │ Expected │ │ DSO │ │
│ │ Contracts │ │ Loss │ │ Days │ │
│ │ 3.2% ▲ │ │ €12.5M ▲ │ │ 38 → │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ 🎯 SALES PIPELINE │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ [Funnel Chart: Opportunities by Stage] │ │
│ │ Pipeline Value: €450M | Win Rate: 32% | Avg Deal: €850K │ │
│ └──────────────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────────┘
Beispiel 2: Risk & Compliance Dashboard
Use Case: Risk-Manager benötigen granulare Einblicke in Portfolio-Risiken, Bonitätsentwicklung, und Compliance-Metriken.
Row-Level Security (RLS) Implementation:
// RLS Role: "Risk Manager - Region DACH"
// Nur Verträge aus Deutschland, Österreich, Schweiz sichtbar
[RLS_Region_DACH] =
DimCustomer[Country] IN {"DE", "AT", "CH"}
DAX Measures:
// Vintage Analysis: Cohort-basierte Portfolio-Quality
[NPL Ratio by Vintage] :=
VAR VintageYear = SELECTEDVALUE(DimDate[Year])
RETURN
CALCULATE(
DIVIDE(
COUNTROWS(
FILTER(
DimContract,
DimContract[Status] = "DEFAULTED" &&
YEAR(DimContract[StartDate]) = VintageYear
)
),
COUNTROWS(
FILTER(
DimContract,
YEAR(DimContract[StartDate]) = VintageYear
)
),
0
)
)
// Credit Migration Matrix
[Credit Rating Downgrade %] :=
VAR CustomersWithDowngrade =
CALCULATE(
DISTINCTCOUNT(DimCustomer[CustomerKey]),
DimCustomer[CreditRatingChange] < 0,
DimDate[Date] >= TODAY() - 365
)
VAR TotalCustomers = DISTINCTCOUNT(DimCustomer[CustomerKey])
RETURN
DIVIDE(CustomersWithDowngrade, TotalCustomers, 0)
// IFRS 16 Lease Liability
[Total Lease Liability] :=
SUMX(
FILTER(DimContract, DimContract[Status] = "ACTIVE"),
VAR RemainingMonths = DimContract[ContractDurationMonths] -
DATEDIFF(DimContract[StartDate], TODAY(), MONTH)
VAR MonthlyRate = DimContract[LeaseRate]
VAR DiscountRate = 0.05 / 12 // 5% annual discount rate
RETURN
MonthlyRate * ((1 - POWER(1 + DiscountRate, -RemainingMonths)) / DiscountRate)
)
Beispiel 3: Self-Service Analytics für Sales
Use Case: Sales-Manager möchten ad-hoc Analysen zu ihren Opportunities erstellen, ohne IT-Abhängigkeiten.
Power BI Dataflow für Sales-Daten:
{
"name": "Sales_Analytics_Dataflow",
"entities": [
{
"name": "OpportunitiesEnriched",
"query": "
let
Source = AzureSynapse.Database('synapse-workspace', 'gold'),
OpportunitiesTable = Source{[Schema='gold', Item='DimOpportunity']}[Data],
AccountsTable = Source{[Schema='gold', Item='DimCustomer']}[Data],
// Merge Opportunities with Customer Data
MergedData = Table.NestedJoin(
OpportunitiesTable, {'AccountID'},
AccountsTable, {'CustomerID'},
'CustomerData', JoinKind.LeftOuter
),
// Expand Customer Fields
ExpandedData = Table.ExpandTableColumn(
MergedData, 'CustomerData',
{'CompanyName', 'Industry', 'EmployeeCount', 'CreditRating'},
{'Account_Name', 'Account_Industry', 'Account_Size', 'Credit_Rating'}
),
// Add Custom Columns
AddedColumns = Table.AddColumn(
ExpandedData, 'Deal_Size_Category',
each if [Amount] >= 1000000 then 'Large Deal (>1M)'
else if [Amount] >= 500000 then 'Medium Deal (500K-1M)'
else 'Small Deal (<500K)',
type text
),
// Calculate Days in Stage
AddedDaysInStage = Table.AddColumn(
AddedColumns, 'Days_In_Current_Stage',
each Duration.Days(DateTime.LocalNow() - [LastStageChangeDate]),
type number
),
// Add Risk Flag
AddedRiskFlag = Table.AddColumn(
AddedDaysInStage, 'At_Risk_Flag',
each if [Days_In_Current_Stage] > 90 and [Stage] <> 'Closed Won' and [Stage] <> 'Closed Lost'
then true else false,
type logical
)
in
AddedRiskFlag
"
}
]
}
Performance-Optimierung: Aggregation Tables
Für große Fact-Tabellen (FactContractTransaction hat 50 Mio. Zeilen) nutzen wir Automatic Aggregations:
-- Pre-aggregated table in Synapse
CREATE TABLE gold.FactContractTransaction_Agg_Daily
WITH (
DISTRIBUTION = HASH(DateKey),
CLUSTERED COLUMNSTORE INDEX
)
AS
SELECT
DateKey,
CustomerKey,
ContractKey,
TransactionType,
PaymentStatus,
-- Aggregated Metrics
COUNT(*) AS TransactionCount,
SUM(Amount) AS TotalAmount,
AVG(Amount) AS AvgAmount,
SUM(CASE WHEN DaysOverdue > 0 THEN 1 ELSE 0 END) AS OverdueCount,
SUM(LateFeeAmount) AS TotalLateFees
FROM gold.FactContractTransaction
GROUP BY
DateKey,
CustomerKey,
ContractKey,
TransactionType,
PaymentStatus;
Power BI erkennt automatisch diese Aggregation und nutzt sie für Queries auf Tag-Ebene (statt 50 Mio. Zeilen nur 500K).
Power BI Best Practices für Enterprise Data Hub
DirectQuery für Echtzeit, Import für Performance
- Management-Dashboards: DirectQuery (immer aktuell)
- Historische Analysen: Import (schneller)
Incremental Refresh für große Datasets
// Nur letzte 2 Jahre im Speicher, Rest in DirectQuery Table.SelectRows( Source, each [TransactionDate] >= Date.AddYears(DateTime.LocalNow(), -2) )Row-Level Security konsistent über alle Layers
- In Azure Synapse: SQL-basierte RLS
- In Power BI: DAX-basierte RLS
- Beide synchronisiert via Azure AD Groups
Zentralisierte Semantic Models
- Ein Dataset pro Business-Domain (Contracts, Sales, Risk)
- Mehrere Reports nutzen dasselbe Dataset
- "Single Version of Truth"
Monitoring & Alerting
- Power BI Premium: Query-Performance-Metrics
- Alert bei Report-Refresh-Failures
- Usage-Analytics: Welche Reports werden genutzt?
Data Science Lösungen auf dem Data Hub
Der Data Hub ist nicht nur für Reporting-Zwecke konzipiert, sondern auch als Fundament für Advanced Analytics und Machine Learning. Bei FinanceLease AG nutzen wir Azure Machine Learning und Databricks für verschiedene Data-Science-Use-Cases.
Use Case 1: Churn Prediction für Vertragsverlängerungen
Business-Problem: 30% der Leasingverträge werden nach Ablauf nicht verlängert. Können wir Kunden mit hohem Churn-Risk frühzeitig identifizieren und proaktiv ansprechen?
Data Science Ansatz:
# Azure ML Pipeline für Churn Prediction Model
from azureml.core import Workspace, Dataset, Experiment
from azureml.train.automl import AutoMLConfig
import pandas as pd
# Connect to Azure ML Workspace
ws = Workspace.from_config()
# Feature Engineering: Daten aus Gold Layer
query = """
SELECT
c.ContractKey,
c.ContractNumber,
c.ContractType,
c.LeaseRate,
c.ContractDurationMonths,
DATEDIFF(day, GETDATE(), c.EndDate) AS DaysUntilEnd,
-- Customer Features
cust.Industry,
cust.EmployeeCount,
cust.CreditRating,
-- Payment Behavior Features
SUM(CASE WHEN ft.PaymentStatus = 'PAID_ONTIME' THEN 1 ELSE 0 END) AS OnTimePaymentCount,
SUM(CASE WHEN ft.PaymentStatus = 'OVERDUE' THEN 1 ELSE 0 END) AS OverduePaymentCount,
AVG(ft.DaysOverdue) AS AvgDaysOverdue,
SUM(ft.LateFeeAmount) AS TotalLateFees,
-- Engagement Features
COUNT(DISTINCT sr.RequestID) AS ServiceRequestCount,
AVG(sr.ResolutionTimeHours) AS AvgResolutionTime,
-- Sales Activity Features
COUNT(DISTINCT opp.OpportunityID) AS NewOpportunitiesCount,
MAX(opp.LastActivityDate) AS LastSalesContact,
-- Target Variable
CASE
WHEN renewal.ContractID IS NOT NULL THEN 1 -- Renewed
ELSE 0 -- Churned
END AS Renewed
FROM gold.DimContract c
LEFT JOIN gold.DimCustomer cust ON c.CustomerID = cust.CustomerID
LEFT JOIN gold.FactContractTransaction ft ON c.ContractKey = ft.ContractKey
LEFT JOIN gold.FactServiceRequest sr ON c.ContractKey = sr.ContractKey
LEFT JOIN gold.DimOpportunity opp ON c.CustomerID = opp.AccountID
LEFT JOIN gold.DimContract renewal ON c.ContractNumber = renewal.PreviousContractNumber
WHERE
c.EndDate < GETDATE() -- Only expired contracts
AND c.EndDate >= DATEADD(year, -2, GETDATE()) -- Last 2 years
GROUP BY
c.ContractKey, c.ContractNumber, c.ContractType, c.LeaseRate,
c.ContractDurationMonths, c.EndDate, cust.Industry, cust.EmployeeCount,
cust.CreditRating, renewal.ContractID
"""
# Load data from Synapse
dataset = Dataset.Tabular.from_sql_query(
query,
query_timeout=600,
validate=True,
workspace=ws
)
df = dataset.to_pandas_dataframe()
# Train/Test Split
from sklearn.model_selection import train_test_split
X = df.drop(['ContractKey', 'ContractNumber', 'Renewed'], axis=1)
y = df['Renewed']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# AutoML Configuration
automl_config = AutoMLConfig(
task='classification',
primary_metric='AUC_weighted',
training_data=X_train,
label_column_name='Renewed',
n_cross_validations=5,
enable_early_stopping=True,
experiment_timeout_hours=2,
max_concurrent_iterations=4,
featurization='auto'
)
# Run Experiment
experiment = Experiment(ws, 'churn-prediction')
run = experiment.submit(automl_config, show_output=True)
# Get Best Model
best_run, fitted_model = run.get_output()
print(f"Best Model: {best_run.properties['model_name']}")
print(f"AUC: {best_run.get_metrics()['AUC_weighted']:.4f}")
# Register Model
from azureml.core.model import Model
model = Model.register(
workspace=ws,
model_path='outputs/model.pkl',
model_name='churn_prediction_model',
tags={'type': 'classification', 'framework': 'scikit-learn'},
description='Predicts contract renewal likelihood'
)
Feature Importance:
Top Features für Churn Prediction:
1. AvgDaysOverdue (0.28) - Zahlungsverzug ist stärkster Indikator
2. ServiceRequestCount (0.18) - Viele Support-Anfragen = Unzufriedenheit
3. LastSalesContact (0.15) - Kürzlicher Kontakt erhöht Retention
4. TotalLateFees (0.12) - Finanzielle Friction
5. EmployeeCount (0.09) - Unternehmensgröße korreliert mit Renewal-Rate
Deployment & Scoring:
# Real-time Scoring Endpoint
from azureml.core.webservice import AciWebservice, Webservice
from azureml.core.model import InferenceConfig
inference_config = InferenceConfig(
entry_script='score.py',
environment=env
)
aci_config = AciWebservice.deploy_configuration(
cpu_cores=1,
memory_gb=2,
auth_enabled=True
)
service = Model.deploy(
workspace=ws,
name='churn-prediction-service',
models=[model],
inference_config=inference_config,
deployment_config=aci_config
)
service.wait_for_deployment(show_output=True)
print(f"Scoring URI: {service.scoring_uri}")
# Batch Scoring Pipeline (täglich für alle auslaufenden Verträge)
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep
# Score contracts ending in next 90 days
batch_score_step = PythonScriptStep(
name='batch_score_churn',
script_name='batch_score.py',
arguments=[
'--model-name', 'churn_prediction_model',
'--output-table', 'gold.ChurnPredictions'
],
compute_target='ml-compute-cluster',
allow_reuse=False
)
pipeline = Pipeline(workspace=ws, steps=[batch_score_step])
pipeline_run = pipeline.submit(experiment_name='churn-batch-scoring')
Business Impact:
- Frühzeitiges Eingreifen bei High-Risk-Kunden (Churn-Score > 0.7)
- Sales erhält täglich priorisierte Liste für proaktive Kontakte
- Retention-Rate stieg von 70% auf 82% (12 Prozentpunkte)
- ROI: €4.5M zusätzliche Vertragsvolumen/Jahr
Use Case 2: Dynamische Restwert-Prognosen für Leasing-Assets
Business-Problem: Restwert-Prognosen von externen Anbietern (DAT/Schwacke) sind generisch. Können wir bessere Prognosen durch eigene Daten trainieren?
Ansatz: Gradient Boosting Regression
# Feature Engineering für Fahrzeug-Restwert-Prognose
import databricks.koalas as ks
from pyspark.sql import functions as F
# Read from Silver Layer
vehicles_df = spark.read.format("delta").load("abfss://silver@datalake/fleet_management/vehicles")
telemetry_df = spark.read.format("delta").load("abfss://silver@datalake/fleet_management/telemetry")
# Aggregate Telemetry Features
telemetry_agg = telemetry_df.groupBy("vehicle_id").agg(
F.max("mileage").alias("current_mileage"),
F.avg("mileage").alias("avg_daily_mileage"),
F.count("*").alias("telemetry_events_count"),
F.stddev("fuel_level").alias("fuel_consumption_variance")
)
# Join and create features
features_df = vehicles_df.join(telemetry_agg, "vehicle_id", "left") \
.withColumn("vehicle_age_months", F.months_between(F.current_date(), "first_registration_date")) \
.withColumn("mileage_per_month", F.col("current_mileage") / F.col("vehicle_age_months")) \
.withColumn("brand_segment", F.when(F.col("brand").isin(["Mercedes", "BMW", "Audi"]), "Premium")
.otherwise("Standard"))
# Target: Actual Sale Price from historical data
target_df = spark.read.format("delta").load("abfss://silver@datalake/asset_sales")
# Train XGBoost Model
from xgboost import XGBRegressor
from sklearn.metrics import mean_absolute_error, r2_score
model = XGBRegressor(
n_estimators=500,
learning_rate=0.05,
max_depth=7,
subsample=0.8,
colsample_bytree=0.8,
random_state=42
)
model.fit(X_train, y_train)
# Predictions
y_pred = model.predict(X_test)
print(f"MAE: €{mean_absolute_error(y_test, y_pred):.2f}")
print(f"R²: {r2_score(y_test, y_pred):.4f}")
# Results:
# MAE: €850 (DAT/Schwacke: €1,450) - 41% bessere Genauigkeit
# R²: 0.91
Use Case 3: Anomalie-Erkennung bei Zahlungseingängen
Use Case: Frühzeitige Erkennung von abnormalen Zahlungsmustern zur Fraud-Detection.
# Isolation Forest für Anomalie-Erkennung
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
# Features: Payment Behavior Patterns
features = [
'payment_amount',
'days_since_last_payment',
'payment_hour', # Ungewöhnliche Zahlungszeiten
'amount_deviation_from_avg',
'payment_frequency_last_30_days'
]
scaler = StandardScaler()
X_scaled = scaler.fit_transform(df[features])
# Train Isolation Forest
iso_forest = IsolationForest(
contamination=0.05, # Erwarte 5% Anomalien
random_state=42
)
df['anomaly_score'] = iso_forest.fit_predict(X_scaled)
df['anomaly_probability'] = iso_forest.score_samples(X_scaled)
# Anomalien zur manuellen Review
anomalies = df[df['anomaly_score'] == -1].sort_values('anomaly_probability')
# Write back to Data Hub
anomalies.to_sql(
'FactPaymentAnomalies',
con=synapse_connection,
schema='gold',
if_exists='append',
index=False
)
API-Integration für Dritt-Applikationen
Der Data Hub muss nicht nur Daten konsumieren, sondern auch für externe Systeme zugänglich machen. Bei FinanceLease AG nutzen wir Azure API Management als Gateway.
Architektur: API Layer
┌─────────────────────────────────────────────────────────────┐
│ External Consumers │
│ • Mobile App • Partner Portal • Third-Party Tools │
└───────────────┬─────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Azure API Management (APIM) │
│ • Authentication (OAuth 2.0, API Keys) │
│ • Rate Limiting (1000 req/hour per consumer) │
│ • Caching (Redis) │
│ • Logging & Analytics │
└───────────────┬─────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Azure Functions (REST API Endpoints) │
│ • GET /api/contracts/{id} │
│ • GET /api/customers/{id}/contracts │
│ • POST /api/contracts/{id}/payments │
│ • GET /api/analytics/kpis │
└───────────────┬─────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Azure Synapse (Gold Layer) / Cosmos DB │
│ • Read-Optimized Views │
│ • Row-Level Security enforced │
└─────────────────────────────────────────────────────────────┘
Beispiel-API: Contract Details
// Azure Function: Get Contract Details
[FunctionName("GetContractDetails")]
public async Task<IActionResult> GetContractDetails(
[HttpTrigger(AuthorizationLevel.Function, "get",
Route = "contracts/{contractNumber}")] HttpRequest req,
string contractNumber,
ILogger log)
{
// Validate authentication
if (!ValidateApiKey(req.Headers["X-API-Key"]))
{
return new UnauthorizedResult();
}
// Row-Level Security: Check user permissions
var userContext = GetUserContext(req.Headers["Authorization"]);
if (!await HasAccessToContract(userContext, contractNumber))
{
return new ForbidResult();
}
// Query Synapse
var query = @"
SELECT
c.ContractNumber,
c.ContractType,
c.LeaseRate,
c.StartDate,
c.EndDate,
c.Status,
cust.CompanyName,
cust.Industry,
v.VIN,
v.Brand,
v.Model,
ISNULL(payments.TotalPaid, 0) AS TotalPaid,
ISNULL(payments.OutstandingAmount, 0) AS OutstandingAmount
FROM gold.DimContract c
INNER JOIN gold.DimCustomer cust ON c.CustomerID = cust.CustomerID
LEFT JOIN gold.DimVehicle v ON c.AssetID = v.VehicleID
LEFT JOIN (
SELECT
ContractKey,
SUM(CASE WHEN PaymentStatus = 'PAID_ONTIME' THEN Amount ELSE 0 END) AS TotalPaid,
SUM(CASE WHEN PaymentStatus = 'PENDING' THEN Amount ELSE 0 END) AS OutstandingAmount
FROM gold.FactContractTransaction
GROUP BY ContractKey
) payments ON c.ContractKey = payments.ContractKey
WHERE c.ContractNumber = @ContractNumber
";
using var connection = new SqlConnection(_synapseConnectionString);
var contract = await connection.QueryFirstOrDefaultAsync<ContractDetailDto>(
query,
new { ContractNumber = contractNumber }
);
if (contract == null)
{
return new NotFoundResult();
}
// Add enrichments from ML models
contract.ChurnProbability = await GetChurnPrediction(contractNumber);
contract.RecommendedActions = await GetRecommendations(contract);
return new OkObjectResult(contract);
}
OpenAPI Specification:
openapi: 3.0.0
info:
title: FinanceLease Data Hub API
version: 1.0.0
description: Access to contract, customer, and analytics data
security:
- ApiKeyAuth: []
- OAuth2: [read:contracts, write:payments]
paths:
/api/contracts/{contractNumber}:
get:
summary: Get contract details
parameters:
- name: contractNumber
in: path
required: true
schema:
type: string
responses:
'200':
description: Contract found
content:
application/json:
schema:
$ref: '#/components/schemas/Contract'
'404':
description: Contract not found
'401':
description: Unauthorized
/api/analytics/kpis:
get:
summary: Get real-time KPIs
parameters:
- name: date
in: query
schema:
type: string
format: date
responses:
'200':
description: KPIs retrieved
content:
application/json:
schema:
$ref: '#/components/schemas/KPIs'
components:
securitySchemes:
ApiKeyAuth:
type: apiKey
in: header
name: X-API-Key
schemas:
Contract:
type: object
properties:
contractNumber:
type: string
contractType:
type: string
enum: [Leasing, Mietkauf]
leaseRate:
type: number
format: decimal
startDate:
type: string
format: date
endDate:
type: string
format: date
status:
type: string
enum: [ACTIVE, ENDING_SOON, EXPIRED]
totalPaid:
type: number
outstandingAmount:
type: number
churnProbability:
type: number
format: float
Rate Limiting & Caching
<!-- APIM Policy für Rate Limiting und Caching -->
<policies>
<inbound>
<!-- Authentication -->
<validate-jwt header-name="Authorization" require-scheme="Bearer">
<issuer-signing-keys>
<key>{{jwt-signing-key}}</key>
</issuer-signing-keys>
<audiences>
<audience>api://financelease-datahub</audience>
</audiences>
</validate-jwt>
<!-- Rate Limiting: 1000 requests/hour per subscription -->
<rate-limit-by-key calls="1000" renewal-period="3600"
counter-key="@(context.Subscription.Id)" />
<!-- Caching für häufig abgerufene Endpoints -->
<cache-lookup vary-by-developer="true" vary-by-developer-groups="false">
<vary-by-query-parameter>date</vary-by-query-parameter>
</cache-lookup>
<!-- Request Transformation -->
<set-backend-service base-url="https://datahub-api.azurewebsites.net" />
</inbound>
<backend>
<forward-request timeout="30" />
</backend>
<outbound>
<!-- Cache successful responses für 5 Minuten -->
<cache-store duration="300" />
<!-- Add Headers -->
<set-header name="X-Data-Freshness" exists-action="override">
<value>@(DateTime.UtcNow.ToString("o"))</value>
</set-header>
</outbound>
<on-error>
<!-- Error Handling -->
<set-body>@{
return new JObject(
new JProperty("error", context.LastError.Message),
new JProperty("timestamp", DateTime.UtcNow)
).ToString();
}</set-body>
</on-error>
</policies>
Zugriffssicherheit: Multi-Layer Security Model
Zugriffssicherheit ist bei Financial-Daten nicht optional. FinanceLease AG implementiert ein Defense-in-Depth-Modell mit mehreren Sicherheitsschichten.
Layer 1: Network Security
┌─────────────────────────────────────────────────────────────┐
│ Azure Virtual Network │
│ ┌────────────────┐ ┌────────────────┐ ┌───────────────┐ │
│ │ Subnet: Data │ │ Subnet: Compute│ │ Subnet: API │ │
│ │ (Synapse, SQL) │ │ (Databricks) │ │ (APIM, Funcs) │ │
│ └────────────────┘ └────────────────┘ └───────────────┘ │
│ │
│ • Private Endpoints für alle Services │
│ • Network Security Groups (NSGs) mit Deny-by-Default │
│ • Azure Firewall für Outbound-Traffic │
└─────────────────────────────────────────────────────────────┘
Public Internet
↓ (nur via Azure Front Door + WAF)
[API Management]
↓
Private Network (keine direkte Internet-Exposition)
Layer 2: Identity & Access Management (Azure AD)
// Role-Based Access Control (RBAC)
// Definierte Rollen:
public enum DataHubRole
{
DataEngineer, // Voller Zugriff auf alle Layers
DataAnalyst, // Read auf Silver/Gold
BusinessUser, // Read auf Gold (via Power BI/API)
DataSci entist, // Read Silver/Gold + ML Workspace
AuditUser // Read-Only für Compliance
}
// Azure AD Group-Mappings
var roleGroups = new Dictionary<DataHubRole, string>
{
{ DataHubRole.DataEngineer, "SG-DataHub-Engineers" },
{ DataHubRole.DataAnalyst, "SG-DataHub-Analysts" },
{ DataHubRole.BusinessUser, "SG-DataHub-BusinessUsers" },
{ DataHubRole.DataScientist, "SG-DataHub-DataScientists" },
{ DataHubRole.AuditUser, "SG-DataHub-Auditors" }
};
Layer 3: Data-Level Security (Row-Level & Column-Level)
-- SQL Server Row-Level Security in Synapse
CREATE SCHEMA Security;
GO
CREATE FUNCTION Security.fn_SecurityPredicate(@CustomerCountry NVARCHAR(2))
RETURNS TABLE
WITH SCHEMABINDING
AS
RETURN
SELECT 1 AS Result
WHERE
-- Data Engineers sehen alles
IS_MEMBER('SG-DataHub-Engineers') = 1
OR
-- Risk Manager nur eigene Region
(IS_MEMBER('SG-Risk-Manager-DACH') = 1 AND @CustomerCountry IN ('DE', 'AT', 'CH'))
OR
(IS_MEMBER('SG-Risk-Manager-BENELUX') = 1 AND @CustomerCountry IN ('BE', 'NL', 'LU'))
OR
-- Sales nur eigene Kunden
(IS_MEMBER('SG-Sales-Team') = 1 AND @CustomerCountry IN (
SELECT Country FROM gold.DimCustomer
WHERE SalesRepID = CAST(SESSION_CONTEXT(N'SalesRepID') AS INT)
));
GO
CREATE SECURITY POLICY Security.ContractSecurityPolicy
ADD FILTER PREDICATE Security.fn_SecurityPredicate(CustomerCountry)
ON gold.DimContract,
ADD BLOCK PREDICATE Security.fn_SecurityPredicate(CustomerCountry)
ON gold.DimContract AFTER INSERT;
GO
-- Column-Level Security: Masking sensibler Daten
ALTER TABLE gold.DimCustomer
ALTER COLUMN TaxID ADD MASKED WITH (FUNCTION = 'partial(2, "XXXX", 2)');
-- Ergebnis für Non-Privileged Users: "12XXXX89" statt "12345689"
Layer 4: Data Encryption
# Encryption at Rest and in Transit
Encryption at Rest:
Azure Data Lake:
- Microsoft-managed keys (Standard)
- Customer-managed keys via Azure Key Vault (für Gold Layer)
Azure Synapse:
- Transparent Data Encryption (TDE) enabled
Azure SQL:
- Always Encrypted für hochsensible Spalten (z.B. IBAN)
Encryption in Transit:
- TLS 1.2+ für alle Verbindungen
- Azure Private Link für Service-to-Service Communication
- VPN/ExpressRoute für On-Premise Konnektivität
Layer 5: Audit Logging
-- Audit Log Tabelle in Synapse
CREATE TABLE audit.DataAccessLog (
LogID BIGINT IDENTITY(1,1) PRIMARY KEY,
UserPrincipalName NVARCHAR(200),
UserRole NVARCHAR(100),
AccessedTable NVARCHAR(200),
AccessType NVARCHAR(50), -- SELECT, INSERT, UPDATE, DELETE
RowCount INT,
QueryText NVARCHAR(MAX),
ClientIPAddress NVARCHAR(50),
ApplicationName NVARCHAR(200),
Timestamp DATETIME2 DEFAULT GETUTCDATE()
);
-- Azure Monitor Integration
-- Alle Zugriffe auf Gold Layer werden geloggt und in Azure Sentinel analysiert
-- Alerts bei:
-- - Ungewöhnlich hohe Anzahl von Queries (mögliches Data Exfiltration)
-- - Zugriff außerhalb Arbeitszeiten
-- - Fehlgeschlagene Authentication-Versuche
Migration bestehender Data Warehouses
Viele Organisationen haben bereits ein Data Warehouse – oft On-Premise SQL Server, Oracle, oder ältere Cloud-Lösungen. Die Migration zu einem modernen Data Hub ist ein schrittweiser Prozess, kein Big-Bang.
Migrations-Strategie: Strangler Fig Pattern
Statt das alte DWH abzuschalten und alles neu zu bauen, migrieren wir inkrementell:
Phase 1: Parallel Betrieb
┌────────────┐ ┌────────────┐
│ Legacy │ │ New Data │
│ DWH │────────→│ Hub │
│ (On-Prem) │ Replika │ (Azure) │
└────────────┘ └────────────┘
↓ ↓
Old Reports New Reports
(wird sukzessive migriert)
Phase 2: Hybrid (90% neuer Hub, 10% Legacy)
┌────────────┐
│ New Data │
┌────────────┐───→│ Hub │
│ Legacy │ │ (Azure) │
│ DWH │ └────────────┘
│ (minimiert)│ ↓
└────────────┘ Most Reports
Phase 3: Full Migration
┌────────────┐
│ New Data │
[Legacy DWH]────→ │ Hub │
decommissioned │ (Azure) │
└────────────┘
↓
All Reports
Migrations-Schritte
Schritt 1: Assessment
# Automated Analysis des bestehenden DWH
import pyodbc
import pandas as pd
# Connect to Legacy DWH
legacy_conn = pyodbc.connect('DSN=LegacyDWH;UID=user;PWD=pass')
# Analyse: Tabellen-Größen
tables_query = """
SELECT
s.name AS SchemaName,
t.name AS TableName,
SUM(p.rows) AS RowCount,
SUM(a.total_pages) * 8 / 1024 AS SizeMB,
MAX(st.last_user_update) AS LastModified
FROM sys.tables t
INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
INNER JOIN sys.indexes i ON t.object_id = i.object_id
INNER JOIN sys.partitions p ON i.object_id = p.object_id AND i.index_id = p.index_id
INNER JOIN sys.allocation_units a ON p.partition_id = a.container_id
LEFT JOIN sys.dm_db_index_usage_stats st ON t.object_id = st.object_id
GROUP BY s.name, t.name
ORDER BY SizeMB DESC
"""
tables_df = pd.read_sql(tables_query, legacy_conn)
# Priorisierung
tables_df['Priority'] = tables_df.apply(lambda row:
'HIGH' if row['RowCount'] > 1000000 or row['SizeMB'] > 1000 else
'MEDIUM' if row['RowCount'] > 100000 else
'LOW', axis=1
)
print(f"Total Tables: {len(tables_df)}")
print(f"Total Size: {tables_df['SizeMB'].sum():.2f} MB")
print(f"\nPriority Distribution:")
print(tables_df['Priority'].value_counts())
Schritt 2: Schema-Migration (DDL)
-- Automated Schema Conversion
-- Tool: Azure Database Migration Service (DMS) oder custom scripts
-- Beispiel: Legacy DWH Fact Table
-- Source (Legacy):
CREATE TABLE dbo.FactSales (
SalesID INT PRIMARY KEY,
DateKey INT,
CustomerKey INT,
Amount DECIMAL(18,2),
Quantity INT,
LoadDate DATETIME DEFAULT GETDATE()
);
-- Target (Synapse Dedicated SQL Pool):
CREATE TABLE gold.FactSales (
SalesID INT NOT NULL,
DateKey INT NOT NULL,
CustomerKey INT NOT NULL,
Amount DECIMAL(18,2),
Quantity INT,
LoadDate DATETIME2,
-- Synapse-spezifische Optimierungen
PRIMARY KEY NONCLUSTERED (SalesID) NOT ENFORCED
)
WITH (
DISTRIBUTION = HASH(CustomerKey), -- Optimiert für Customer-Joins
CLUSTERED COLUMNSTORE INDEX
);
Schritt 3: Data Migration
# Azure Data Factory Pipeline für Data Migration
from azure.identity import DefaultAzureCredential
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
credential = DefaultAzureCredential()
adf_client = DataFactoryManagementClient(credential, subscription_id)
# Create Pipeline für jede Tabelle
for table in tables_df[tables_df['Priority'] == 'HIGH'].itertuples():
pipeline_name = f"Migrate_{table.SchemaName}_{table.TableName}"
# Copy Activity
copy_activity = CopyActivity(
name=f"Copy_{table.TableName}",
inputs=[
DatasetReference(reference_name='LegacyDWH_Generic', parameters={
'schemaName': table.SchemaName,
'tableName': table.TableName
})
],
outputs=[
DatasetReference(reference_name='Synapse_Generic', parameters={
'schemaName': 'gold',
'tableName': table.TableName
})
],
source=SqlSource(),
sink=SqlDWSink(
pre_copy_script=f"TRUNCATE TABLE gold.{table.TableName}",
write_batch_size=10000,
write_batch_timeout='00:10:00'
),
enable_staging=True, # Nutze Azure Blob als Staging
staging_settings=StagingSettings(
linked_service_name='AzureBlobStorage',
path='staging'
)
)
pipeline = PipelineResource(
activities=[copy_activity]
)
adf_client.pipelines.create_or_update(
resource_group_name,
data_factory_name,
pipeline_name,
pipeline
)
print(f"Created pipeline: {pipeline_name}")
Schritt 4: Reconciliation & Validation
-- Data Validation: Row Counts müssen matched
SELECT
'Legacy' AS Source,
COUNT(*) AS RowCount,
SUM(Amount) AS TotalAmount,
MAX(LoadDate) AS LatestRecord
FROM LegacyDWH.dbo.FactSales
UNION ALL
SELECT
'Synapse' AS Source,
COUNT(*) AS RowCount,
SUM(Amount) AS TotalAmount,
MAX(LoadDate) AS LatestRecord
FROM gold.FactSales;
-- Expected: Row Counts und TotalAmount identisch
Schritt 5: Cutover & Decommissioning
Cutover-Checklist:
✓ Alle High-Priority Tables migriert und validiert
✓ Alle Legacy-Reports auf neuen Data Hub migriert
✓ Performance-Tests: Queries ≤ Legacy-Performance
✓ User-Acceptance-Tests durchgeführt
✓ Rollback-Plan dokumentiert
✓ Go-Live-Communication an Stakeholder
Post-Cutover:
Week 1-2: Parallel Run (Legacy DWH read-only, neue Reports aktiv)
Week 3-4: Monitoring, Hotfix-Phase
Month 2: Legacy DWH Decommissioning
Month 3: Hardware Return / Cost Savings realized
Migrations-Herausforderungen & Lessons Learned
Challenge 1: Query-Kompatibilität
- Legacy T-SQL Queries nutzen oft Synapse-inkompatible Features
- Solution: Query-Rewrite oder Compatibility Layer (Views)
Challenge 2: Performance-Regression
- Einige Queries langsamer auf Synapse als On-Premise (Netzwerk-Latenz)
- Solution: Materialized Views, Aggressive Caching, Query-Optimierung
Challenge 3: Change Management
- User-Widerstand gegen "neue" Tools
- Solution: Intensive Trainings, Champions-Program, Quick Wins zeigen
Challenge 4: Downtime-Fenster
- 24/7-Betrieb erlaubt kein Wartungsfenster
- Solution: Blue-Green-Deployment, schrittweise Migration
Fazit & Best Practices
Der Aufbau eines modernen Data Hubs für ein Leasing-Unternehmen ist ein strategisches, multi-dimensionales Projekt, das weit über die reine Technologie hinausgeht. Basierend auf der Erfahrung mit FinanceLease AG und ähnlichen Projekten hier die wichtigsten Erkenntnisse:
Architektur:
- Medallion-Architektur (Bronze-Silver-Gold) als bewährtes Pattern
- Delta Lake für ACID-Transaktionen und Time-Travel
- Synapse Dedicated SQL Pool für performantes BI-Reporting
- Separation of Concerns: Ingestion ≠ Processing ≠ Consumption
Datenqualität: 5. Data Quality ist kein einmaliges Projekt, sondern kontinuierlicher Prozess 6. Automatisierte Data Quality Checks in jeder Layer-Transformation 7. Data Lineage und Governance (Azure Purview) von Tag 1
Analytics & ML: 8. Data Hub als Fundament für Advanced Analytics, nicht nur Reporting 9. Feature Store für reproduzierbare ML-Features 10. MLOps-Praktiken: Versionierung, CI/CD für Models
Security: 11. Defense-in-Depth: Network, Identity, Data-Level Security 12. Encryption at Rest und in Transit ist Standard, kein Optional 13. Audit Logging für Compliance (DSGVO, BaFin)
Migration: 14. Strangler Fig Pattern statt Big-Bang 15. Automatisierte Assessment-Tools nutzen 16. Mindestens 20% Puffer für unerwartete Komplexität
Organisation: 17. Cross-funktionale Teams (Data Engineers, Analysts, Business) 18. Dedicated Data Product Owner mit Business-Background 19. Self-Service als Ziel, aber mit klaren Governance-Guard-Rails
Messbare Erfolge bei FinanceLease AG:
- Time-to-Insight: 2 Wochen → 2 Stunden (95% Reduktion)
- Data Quality: 78% → 96% korrekte Daten
- Analytics-Adoption: 12% → 67% der Business-User nutzen aktiv Reports
- Cost: €420K Legacy-DWH → €180K Cloud (57% Reduktion)
- Business Impact: €4.5M zusätzlicher Revenue durch Churn-Prevention
Der moderne Data Hub ist kein Projekt, sondern ein Produkt – ein lebendes System, das kontinuierlich weiterentwickelt wird und messbaren Business-Value liefert.