CyberFlow Logo CyberFlow BLOG
Soc L3 Cloud Security

Python Pandas ile Log Analizi

✍️ Ahmet BİRKAN 📂 Soc L3 Cloud Security

Python Pandas ile Log Analizi konusunu SOC L3 - İleri Tehdit Avcılığı - Veri Madenciliği ve Modelleme baglaminda blog formatinda ogrenin. Temel akis, kavram eslestirmeleri ve analiz mantigi tek bir yapida birlestirildi.

Python Pandas ile Log Analizi

Pandas ile büyük log dosyası okuma ve ilk keşif analizi. Bellek optimizasyonu için dtype belirleme ve chunk okuma kritiktir: import pandas as pd import numpy as np dtype_map = { 'src_ip': 'category', 'dst_ip': 'category', 'action': 'category', 'protocol': 'category', 'dst_port': 'uint16', 'bytes_in': 'uint32', 'bytes_out': 'uint32', 'duration': 'float32' } for chunk in pd.read_csv( 'firewall_30days.log', sep='|', dtype=dtype_map, parse_dates=['timestamp'], chunksize=100000 ): print(f'Bellek kullanımı: {df.memory_usage(deep=True).sum()/1024**2:.1f} MB') print(df.dtypes) print(df.describe())

Giris ve Temel Akis

Pandas ile büyük log dosyası okuma ve ilk keşif analizi. Bellek optimizasyonu için dtype belirleme ve chunk okuma kritiktir: import pandas as pd import numpy as np dtype_map = { 'src_ip': 'category', 'dst_ip': 'category', 'action': 'category', 'protocol': 'category', 'dst_port': 'uint16', 'bytes_in': 'uint32', 'bytes_out': 'uint32', 'duration': 'float32' } for chunk in pd.read_csv( 'firewall_30days.log', sep='|', dtype=dtype_map, parse_dates=['timestamp'], chunksize=100000 ): print(f'Bellek kullanımı: {df.memory_usage(deep=True).sum()/1024**2:.1f} MB') print(df.dtypes) print(df.describe())

Bu bölümün pratik akışı şu sırayla ilerler:

  • dtype_map = {'src_ip':'category','dst_ip':'category','dst_port':'uint16','bytes_out':'uint32'}
  • chunks = []
  • for chunk in pd.read_csv('firewall_30days.log', sep='|', dtype=dtype_map, parse_dates=['timestamp'], chunksize=100000):
  • filtered = chunk[chunk['action'] == 'BLOCK']
  • chunks.append(filtered)
  • df = pd.concat(chunks, ignore_index=True)
  • print(f'Toplam kayıt: {len(df):,}')
  • print(f'Bellek: {df.memory_usage(deep=True).sum()/1024**2:.1f} MB')

Temel Kavram Eslesmeleri

Pandas veri tipi seçimi bellek kullanımını ve işlem hızını doğrudan etkiler. SOC log analizinde doğru dtype büyük fark yaratır. Örnek bellek karşılaştırması: - src_ip object: 100MB → category: 8MB - dst_port int64: 80MB → uint16: 10MB - bytes_out float64: 80MB → uint32: 40MB - timestamp object: 120MB → datetime64: 40MB

  • category: Tekrar eden string alanlar için — src_ip, action, protocol
  • uint16: 0-65535 arası port numaraları için
  • float32: Hassasiyet kaybı kabul edilebilir süre ve oran için
  • datetime64: Zaman serisi işlemi gereken timestamp alanı için

Ilk Cekirdek Kavram

Bu bölümde öne çıkan çekirdek kavram fill_value olarak verilir. Pandas ile Windows Event Log analizi. EventID bazlı saldırı örüntüsü tespiti ve pivot table korelasyonu: import pandas as pd import numpy as np df_evt = pd.read_csv('windows_events.csv', parse_dates=['TimeCreated']) df_evt['hour'] = df_evt['TimeCreated'].dt.hour df_evt['dayofweek'] = df_evt['TimeCreated'].dt.dayofweek df_evt['is_off_hours'] = ((df_evt['hour'] < 7) | (df_evt['hour'] > 20)).astype(int) pivot = df_evt[df_evt['EventID'].isin([4624,4625,4648,4768,4769])].pivot_table( values='EventRecordID', index='SubjectUserName', columns='EventID', aggfunc='count', fill_value=0 ) pivot.columns = [f'EventID_{c}' for c in pivot.columns] pivot['fail_ratio'] = pivot.get('EventID_4625',0) / (pivot.get('EventID_4624',0) + pivot.get('EventID_4625',0) + 0.001) pivot['kerberoast_score'] = pivot.get('EventID_4769',0) suspicious = pivot[(pivot['fail_ratio'] > 0.5) | (pivot['kerberoast_score'] > 10)] print(suspicious.sort_values('fail_ratio', ascending=False).head(20))

Arac, Komut veya Inceleme Akisi

Pandas ile NetFlow verisi üzerinde port tarama tespiti. Vektörize operasyonlar ve groupby ile yüksek performanslı analiz: import pandas as pd import numpy as np df_flow = pd.read_parquet('netflow_24h.parquet') df_flow['timestamp'] = pd.to_datetime(df_flow['timestamp']) scan_features = df_flow.groupby(['src_ip','time_window']).agg( unique_dst_ips=('dst_ip','nunique'), unique_dst_ports=('dst_port','nunique'), total_conns=('dst_ip','count'), total_bytes=('bytes','sum'), mean_duration=('duration','mean'), syn_only_ratio=('tcp_flags', lambda x: (x=='S').mean()) ).reset_index() scan_features['conn_per_min'] = scan_features['total_conns'] / 5 print(scanners.groupby('src_ip')[['unique_dst_ips','unique_dst_ports','total_conns']].max().sort_values('total_conns',ascending=False).head(10))

Bu bölümün pratik akışı şu sırayla ilerler:

  • df_flow['time_window'] = df_flow['timestamp'].dt.floor('5min')
  • scan_features = df_flow.groupby(['src_ip','time_window']).agg(unique_dst_ips=('dst_ip','nunique'), unique_dst_ports=('dst_port','nunique'), total_conns=('dst_ip','count')).reset_index()
  • scan_features['is_horizontal'] = (scan_features['unique_dst_ips']>50) & (scan_features['unique_dst_ports']<5)
  • scan_features['is_vertical'] = (scan_features['unique_dst_ips']<5) & (scan_features['unique_dst_ports']>50)
  • scan_features['is_full_scan'] = (scan_features['unique_dst_ips']>20) & (scan_features['unique_dst_ports']>20)
  • scanners = scan_features[scan_features['is_horizontal']|scan_features['is_vertical']|scan_features['is_full_scan']]
  • print(f'Tarama tespit: {scanners["src_ip"].nunique()} kaynak IP')

Kanit ve Bilesen Iliskileri

Pandas performans optimizasyonu büyük SOC log veri setlerinde kritiktir. Yanlış yaklaşım saatlik analizi dakikaya indirebilir. Örnek performans karşılaştırması: - df.iterrows(): 10M kayıt için 300 saniye - df.apply(): 10M kayıt için 30 saniye - Vektörize numpy: 10M kayıt için 0.5 saniye - Parquet formatı: CSV'den 5-10x hızlı okuma

  • Vektörize numpy ops: Koşullu alan hesaplama — iterrows yerine kullan
  • Parquet formatı: Büyük log arşivini hızlı okumak için CSV yerine
  • query() metodu: Büyük DataFrame'de okunabilir ve hızlı filtreleme
  • category dtype: Tekrar eden string alanları bellek verimli depola

Ikincil Odak Noktasi

Bu bölümde öne çıkan çekirdek kavram left olarak verilir. Pandas merge ve join ile çok kaynaklı log korelasyonu. Firewall, proxy ve DNS loglarını tek analizde birleştirme: import pandas as pd df_fw = pd.read_parquet('firewall.parquet') df_proxy = pd.read_parquet('proxy.parquet') df_dns = pd.read_parquet('dns.parquet') df_fw['time_key'] = df_fw['timestamp'].dt.floor('1min') df_proxy['time_key'] = df_proxy['timestamp'].dt.floor('1min') df_dns['time_key'] = df_dns['timestamp'].dt.floor('1min') correlated = df_fw.merge( df_proxy[['src_ip','time_key','url','user_agent','proxy_bytes']], on=['src_ip','time_key'], how='left' ).merge( df_dns[['src_ip','time_key','query','dns_type','response_ip']], on=['src_ip','time_key'], how='left' ) correlated['has_proxy'] = correlated['url'].notna().astype(int) correlated['has_dns'] = correlated['query'].notna().astype(int) multi_source = correlated[(correlated['has_proxy']==1) & (correlated['has_dns']==1)] print(f'Çok kaynak korelasyon: {len(multi_source):,} kayıt') print(multi_source[['src_ip','fw_bytes','proxy_bytes','url','query']].head(10))

Operasyonel Dogrulama ve Raporlama

Pandas ile DNS log analizi ve tünelleme tespiti. String operasyonları ve istatistiksel özellik mühendisliği: import pandas as pd import numpy as np from scipy.stats import entropy df_dns = pd.read_parquet('dns_logs.parquet') df_dns['timestamp'] = pd.to_datetime(df_dns['timestamp']) df_dns['fqdn_len'] = df_dns['query'].str.len() df_dns['subdomain'] = df_dns['query'].str.split('.').str[0] df_dns['subdomain_len'] = df_dns['subdomain'].str.len() df_dns['label_count'] = df_dns['query'].str.count('\.') df_dns['has_numeric'] = df_dns['subdomain'].str.contains('\d{5,}', regex=True).astype(int) df_dns['entropy'] = df_dns['subdomain'].apply( lambda x: entropy([x.count(c)/len(x) for c in set(x)]) if len(x) > 0 else 0 ) tunnel_features = df_dns.groupby('src_ip').agg( mean_fqdn_len=('fqdn_len','mean'), mean_subdomain_len=('subdomain_len','mean'), mean_entropy=('entropy','mean'), max_entropy=('entropy','max'), numeric_ratio=('has_numeric','mean'), query_count=('query','count'), unique_domains=('query','nunique') ).reset_index() suspect = tunnel_features[ (tunnel_features['mean_subdomain_len'] > 30) | (tunnel_features['mean_entropy'] > 3.5) | (tunnel_features['numeric_ratio'] > 0.4) ] print(suspect.sort_values('mean_entropy', ascending=False).head(10))

Bu bölümün pratik akışı şu sırayla ilerler:

  • entropy

Cikti ve Kullanım Amaci

Pandas ile SOC log analizinde sık kullanılan aggregation fonksiyonları farklı analitik ihtiyaçları karşılar. Örnek agg kombinasyonları: - nunique: kaç farklı hedef IP'ye bağlandı - sum: toplam transfer edilen byte - mean: ortalama oturum süresi - std: davranış tutarsızlığı ölçümü - lambda x: (x=='BLOCK').mean() — bloklama oranı

  • nunique: Kaç farklı hedefe bağlanıldığını sayar
  • std: Davranış tutarsızlığını ve volatiliteyi ölçer
  • quantile(0.95): Aykırı değer eşiğini veri bazlı belirler
  • lambda x: (x>0).mean(): Belirli olayın gerçekleşme oranını hesaplar

Son Kavram ve Cikis

Bu bölümde öne çıkan çekirdek kavram ws[1] olarak verilir. Pandas ile otomatik SOC raporu oluşturma. Analiz sonuçlarını Excel'e çok sayfalı olarak aktarma ve renklendirme: import pandas as pd from openpyxl.styles import PatternFill, Font, Alignment from openpyxl.utils.dataframe import dataframe_to_rows from datetime import datetime def export_soc_report(report_data, filename=None): if filename is None: filename = f'soc_report_{datetime.now().strftime("%Y%m%d_%H%M")}.xlsx' with pd.ExcelWriter(filename, engine='openpyxl') as writer: for sheet_name, df_sheet in report_data.items(): df_sheet.to_excel(writer, sheet_name=sheet_name, index=False) ws = writer.sheets[sheet_name] for cell in ws[1]: cell.fill = PatternFill('solid', fgColor='1F4E79') cell.font = Font(color='FFFFFF', bold=True) cell.alignment = Alignment(horizontal='center') ws.freeze_panes = 'A2' print(f'Rapor oluşturuldu: {filename}') return filename report_data = { 'Kritik_Alarmlar': df_critical, 'Port_Tarama': df_scanners, 'DNS_Tünelleme': suspect, 'Kullanıcı_Anomalisi': suspicious_users } export_soc_report(report_data)

Bu Egitimden Ne Kazanirsiniz?

Bu icerik, Python Pandas ile Log Analizi konusunu SOC L3 - İleri Tehdit Avcılığı - Veri Madenciliği ve Modelleme baglaminda parcali degil, butunlu bir ogrenme akisina donusturur. Yalnizca kavramlari ezberlemek yerine surec sirasini, bilesenler arasi iliskiyi ve hangi kanitin neden onemli oldugunu kavramayi hedefler.

Ozet

Bu ders kapsaminda one cikan basliklar: category, uint16, float32, datetime64, Vektörize numpy ops, Parquet formatı, query() metodu, category dtype, nunique, std. Egitimin mantigi; once temel akis kurmak, sonra eslestirme ve kavram netlestirme yapmak, en sonda ise bulguyu operasyonel bir sonuca baglamaktir.