Jupyter Notebook ile SOC Analiz İş Akışı
Jupyter Notebook SOC analistlerinin veri analizi, görselleştirme ve raporlamayı tek ortamda yapmasını sağlar. Splunk REST API ile Jupyter entegrasyonu: import requests import pandas as pd import json from IPython.display import display SPLUNK_HOST = 'https://splunk.example.com:8089' SPLUNK_USER = 'analyst' SPLUNK_PASS = 'password' def splunk_query(spl, earliest='-24h', latest='now'): search_url = f'{SPLUNK_HOST}/services/search/jobs' payload = { 'search': f'search {spl}', 'earliest_time': earliest, 'latest_time': latest, 'output_mode': 'json' } results_url = f'{SPLUNK_HOST}/services/search/jobs/{sid}/results' while True: status = requests.get(f'{SPLUNK_HOST}/services/search/jobs/{sid}', params={'output_mode':'json'}, auth=auth, verify=False) if status.json()['entry'][0]['content']['isDone']: break results = requests.get(results_url, params={'output_mode':'json','count':10000}, auth=auth, verify=False) df = splunk_query('index=network | stats sum(bytes_out) as total_bytes by src_ip | sort -total_bytes')
Giris ve Temel Akis
Jupyter Notebook SOC analistlerinin veri analizi, görselleştirme ve raporlamayı tek ortamda yapmasını sağlar. Splunk REST API ile Jupyter entegrasyonu: import requests import pandas as pd import json from IPython.display import display SPLUNK_HOST = 'https://splunk.example.com:8089' SPLUNK_USER = 'analyst' SPLUNK_PASS = 'password' def splunk_query(spl, earliest='-24h', latest='now'): search_url = f'{SPLUNK_HOST}/services/search/jobs' payload = { 'search': f'search {spl}', 'earliest_time': earliest, 'latest_time': latest, 'output_mode': 'json' } results_url = f'{SPLUNK_HOST}/services/search/jobs/{sid}/results' while True: status = requests.get(f'{SPLUNK_HOST}/services/search/jobs/{sid}', params={'output_mode':'json'}, auth=auth, verify=False) if status.json()['entry'][0]['content']['isDone']: break results = requests.get(results_url, params={'output_mode':'json','count':10000}, auth=auth, verify=False) df = splunk_query('index=network | stats sum(bytes_out) as total_bytes by src_ip | sort -total_bytes')
Bu bölümün pratik akışı şu sırayla ilerler:
- auth = (SPLUNK_USER, SPLUNK_PASS)
- r = requests.post(search_url, data=payload, auth=auth, verify=False)
- sid = r.json()['sid']
- while True: status = requests.get(f'{SPLUNK_HOST}/services/search/jobs/{sid}', params={'output_mode':'json'}, auth=auth); if status.json()['entry'][0]['content']['isDone']: break
- results = requests.get(results_url, params={'output_mode':'json','count':10000}, auth=auth, verify=False)
- return pd.DataFrame(results.json()['results'])
- display(df.head(10))
Temel Kavram Eslesmeleri
Jupyter Notebook SOC kullanım senaryolarında farklı kütüphane kombinasyonları kullanılır. Her analiz tipi için uygun kütüphane seti seçilmelidir. Örnek SOC notebook tipleri: - Threat hunting notebook: pandas + elasticsearch-py + plotly - Malware analiz notebook: pefile + capstone + yara-python - Network forensics notebook: scapy + dpkt + geoip2 - IR timeline notebook: pandas + matplotlib + networkx
- Threat hunting: pandas + elasticsearch-py + plotly
- Malware analizi: pefile + capstone + yara-python
- Network forensics: scapy + dpkt + geoip2
- IR timeline: pandas + matplotlib + networkx
Ilk Cekirdek Kavram
Bu bölümde öne çıkan çekirdek kavram scan olarak verilir. Elastic Python client ile Jupyter'dan büyük veri setini scroll API kullanarak verimli çekme: from elasticsearch import Elasticsearch from elasticsearch.helpers import scan import pandas as pd es = Elasticsearch(['https://localhost:9200'], http_auth=('elastic','password'), verify_certs=False ) query = { "query": { "bool": { "must": [ {"range": {"@timestamp": {"gte": "now-24h"}}}, {"term": {"event.category": "network"}} ], "must_not": [ {"terms": {"source.ip": ["127.0.0.1","::1"]}} ] } }, "_source": ["@timestamp","source.ip","destination.ip","network.bytes","event.action"] } results = list(scan(es, query=query, index='logs-network-*', scroll='5m', size=1000)) df = pd.DataFrame([r['_source'] for r in results]) print(f'Çekilen kayıt: {len(df)}') print(df.dtypes)
Arac, Komut veya Inceleme Akisi
Jupyter'da etkileşimli SOC dashboard oluşturma. ipywidgets ile dinamik filtreli tehdit analizi: import ipywidgets as widgets import pandas as pd import plotly.express as px from IPython.display import display time_range = widgets.Dropdown( options=[('Son 1 saat',1),('Son 6 saat',6),('Son 24 saat',24),('Son 7 gün',168)], value=24, description='Zaman:' ) def on_refresh(b): with output: output.clear_output() hours = time_range.value score = min_score.value df_filtered = df[ (df['timestamp'] >= pd.Timestamp.now() - pd.Timedelta(hours=hours)) & (df['risk_score'] >= score) ] fig = px.scatter(df_filtered, x='timestamp', y='risk_score', color='risk_level', hover_data=['src_ip','mitre_technique'], title=f'Son {hours}h Risk Skoru Dağılımı (min={score})') fig.show() display(df_filtered[['src_ip','risk_score','mitre_technique']].head(20))
Bu bölümün pratik akışı şu sırayla ilerler:
- time_range = widgets.Dropdown(options=[('Son 1 saat',1),('Son 24 saat',24)], value=24)
- min_score = widgets.IntSlider(value=50, min=0, max=100, description='Min Risk:')
- refresh_btn = widgets.Button(description='Yenile', button_style='primary')
- output = widgets.Output()
- def on_refresh(b): with output: df_filtered=df[(df['risk_score']>=min_score.value)]; fig=px.scatter(df_filtered,...); fig.show()
- refresh_btn.on_click(on_refresh)
- display(widgets.VBox([time_range, min_score, refresh_btn, output]))
Kanit ve Bilesen Iliskileri
SOC Jupyter notebook'larının yapılandırılması güvenlik ve kullanılabilirlik açısından kritiktir. Kurumsal SOC ortamında notebook altyapısı doğru kurulmalıdır. Örnek kurumsal Jupyter gereksinimleri: - JupyterHub: çok kullanıcılı merkezi erişim - nbformat: notebook versiyon kontrolü için Git entegrasyonu - papermill: notebook otomasyonu ve zamanlanmış çalıştırma - nbconvert: notebook'u PDF/HTML rapor olarak dışa aktarma
- JupyterHub: Çok kullanıcılı merkezi SOC notebook sunucusu
- papermill: Parametreli notebook otomasyonu ve zamanlama
- nbconvert: Notebook'u PDF/HTML yönetim raporuna çevirir
- nbformat: Notebook versiyon kontrolü için Git uyumlu format
Ikincil Odak Noktasi
Bu bölümde öne çıkan çekirdek kavram parameters olarak verilir. papermill ile Jupyter notebook'larını parametre alarak otomatik çalıştırma. Günlük tehdit avcılığı raporunu zamanlanmış olarak üretme: import papermill as pm from datetime import datetime, timedelta import os def run_daily_hunt_notebook(hunt_type, lookback_hours=24): today = datetime.now().strftime('%Y%m%d') input_nb = f'templates/hunt_{hunt_type}.ipynb' output_nb = f'reports/{today}hunt{hunt_type}.ipynb' output_html = f'reports/{today}hunt{hunt_type}.html' pm.execute_notebook( input_nb, output_nb, parameters={ 'lookback_hours': lookback_hours, 'min_risk_score': 50, 'report_date': today, 'analyst': 'SOC_L3_AUTO' }, kernel_name='python3', log_output=True ) os.system(f'jupyter nbconvert --to html {output_nb} --output {output_html}') print(f'Rapor üretildi: {output_html}') run_daily_hunt_notebook('lateral_movement', lookback_hours=24) run_daily_hunt_notebook('data_exfiltration', lookback_hours=24)
Operasyonel Dogrulama ve Raporlama
Jupyter'da IR (Incident Response) timeline notebook'u. Olay zaman çizelgesini otomatik oluşturma ve kanıt zinciri dokümantasyonu: import pandas as pd import plotly.figure_factory as ff from datetime import datetime import hashlib import json def create_ir_timeline(incident_id, df_events): df_events['evidence_hash'] = df_events.apply( lambda r: hashlib.sha256(str(r.values).encode()).hexdigest()[:16], axis=1 ) gantt_data = [] for _, row in df_events.iterrows(): gantt_data.append({ 'Task': f"{row['event_type']} — {row['hostname']}", 'Start': row['timestamp'], 'Finish': row['timestamp'] + pd.Timedelta(minutes=1), 'Resource': row['mitre_technique'] }) fig = ff.create_gantt(gantt_data, index_col='Resource', show_colorbar=True, group_tasks=True, title=f'IR Timeline — {incident_id}') report = { 'incident_id': incident_id, 'generated_at': datetime.now().isoformat(), 'event_count': len(df_events), 'timeline': df_events[['timestamp','event_type','hostname','mitre_technique','evidence_hash']].to_dict('records') } with open(f'ir_reports/{incident_id}_timeline.json','w') as f: json.dump(report, f, indent=2, default=str) print(f'IR Timeline kaydedildi: {incident_id}_timeline.json') create_ir_timeline('IR-2024-0142', df_incident_events)
Bu bölümün pratik akışı şu sırayla ilerler:
- df_events = df_events.sort_values('timestamp')
- df_events['evidence_hash'] = df_events.apply(lambda r: hashlib.sha256(str(r.values).encode()).hexdigest()[:16], axis=1)
- gantt_data = [{'Task':f"{row['event_type']}", 'Start':row['timestamp'], 'Resource':row['mitre_technique']} for _,row in df_events.iterrows()]
- fig = ff.create_gantt(gantt_data, index_col='Resource', show_colorbar=True)
- fig.show()
- report = {'incident_id':incident_id,'event_count':len(df_events),'timeline':df_events.to_dict('records')}
- with open(f'ir_reports/{incident_id}_timeline.json','w') as f: json.dump(report, f, indent=2, default=str)
Cikti ve Kullanım Amaci
SOC notebook güvenliği kurumsal ortamda kritik gereksinimler doğurur. Notebook'ların güvenli kullanımı için kontroller tanımlanmalıdır. Örnek güvenlik gereksinimleri: - Credential yönetimi: API key'ler notebook içinde düz metin olmamalı - Erişim kontrolü: JupyterHub LDAP entegrasyonu ile SSO - Audit log: kim hangi notebook'u ne zaman çalıştırdı - Output temizleme: hassas veri içeren output commit edilmemeli
- Credential yönetimi: python-dotenv veya HashiCorp Vault entegrasyonu
- Erişim kontrolü: JupyterHub LDAP/OAuth ile SSO kimlik doğrulama
- Output temizleme: pre-commit hook ile nbstripout otomasyonu
- Audit log: JupyterHub activity log ve kernel event izleme
Son Kavram ve Cikis
Bu bölümde öne çıkan çekirdek kavram %store olarak verilir. Jupyter Magic komutları SOC analizini hızlandırır. %%time, %run ve özel magic komutlar ile analiz iş akışını optimize etme: import IPython from IPython.core.magic import register_cell_magic import pandas as pd import time @register_cell_magic def splunk_hunt(line, cell): """Splunk sorgusunu çalıştıran custom magic""" start = time.time() spl_query = cell.strip() df = splunk_query(spl_query) elapsed = time.time() - start print(f'Sorgu süresi: {elapsed:.2f}s | Sonuç: {len(df)} kayıt') return df # Kullanım: # %%splunk_hunt # index=network earliest=-1h # | stats sum(bytes_out) as total_bytes by src_ip # | sort -total_bytes # | head 20 # Performans ölçümü # %timeit splunk_query('index=network | head 100') # Başka notebook çalıştırma # %run baseline_builder.ipynb # Değişken paylaşımı # %store df_baseline # %store -r df_baseline
Bu Egitimden Ne Kazanirsiniz?
Bu icerik, Jupyter Notebook ile SOC Analiz İş Akışı konusunu SOC L3 - İleri Tehdit Avcılığı - Veri Madenciliği ve Modelleme baglaminda parcali degil, butunlu bir ogrenme akisina donusturur. Yalnizca kavramlari ezberlemek yerine surec sirasini, bilesenler arasi iliskiyi ve hangi kanitin neden onemli oldugunu kavramayi hedefler.
Ozet
Bu ders kapsaminda one cikan basliklar: Threat hunting, Malware analizi, Network forensics, IR timeline, JupyterHub, papermill, nbconvert, nbformat, Credential yönetimi, Erişim kontrolü. Egitimin mantigi; once temel akis kurmak, sonra eslestirme ve kavram netlestirme yapmak, en sonda ise bulguyu operasyonel bir sonuca baglamaktir.