CyberFlow Logo CyberFlow BLOG
Soc L3 Cloud Security

Splunk SPL ile İleri Seviye Veri Sorgulama

✍️ Ahmet BİRKAN 📂 Soc L3 Cloud Security

Splunk SPL ile İleri Seviye Veri Sorgulama konusunu SOC L3 - İleri Tehdit Avcılığı - Veri Madenciliği ve Modelleme baglaminda blog formatinda ogrenin. Temel akis, kavram eslestirmeleri ve analiz mantigi tek bir yapida birlestirildi.

Splunk SPL ile İleri Seviye Veri Sorgulama

SPL'de transaction komutu birbirine bağlı olayları tek kayıtta gruplar. Saldırı zincirini tek sorguda yakalamak için kullanılır: index=wineventlog earliest=-24h (EventCode=4624 OR EventCode=4625 OR EventCode=4688 OR EventCode=4698) | eval has_process_create=if(mvfind(EventCode,"4688")>=0,1,0) | eval has_schtask=if(mvfind(EventCode,"4698")>=0,1,0) | eval attack_chain_score=( has_failed_login * 20 + has_process_create * 30 + has_schtask * 50 ) | table user, duration, chain_length, attack_chain_score, EventCode

Giris ve Temel Akis

SPL'de transaction komutu birbirine bağlı olayları tek kayıtta gruplar. Saldırı zincirini tek sorguda yakalamak için kullanılır: index=wineventlog earliest=-24h (EventCode=4624 OR EventCode=4625 OR EventCode=4688 OR EventCode=4698) | eval has_process_create=if(mvfind(EventCode,"4688")>=0,1,0) | eval has_schtask=if(mvfind(EventCode,"4698")>=0,1,0) | eval attack_chain_score=( has_failed_login * 20 + has_process_create * 30 + has_schtask * 50 ) | table user, duration, chain_length, attack_chain_score, EventCode

Bu bölümün pratik akışı şu sırayla ilerler:

  • index=wineventlog earliest=-24h (EventCode=4624 OR EventCode=4625 OR EventCode=4688 OR EventCode=4698)
  • | transaction user maxspan=1h maxpause=10m keepevicted=true
  • | eval chain_length=eventcount
  • | eval has_failed_login=if(mvfind(EventCode,"4625")>=0,1,0)
  • | eval attack_chain_score=(has_failed_login20 + has_process_create30 + has_schtask*50)
  • | where attack_chain_score >= 50
  • | table user, duration, chain_length, attack_chain_score
  • | sort -attack_chain_score

Temel Kavram Eslesmeleri

SPL'nin ileri seviye komutları karmaşık güvenlik sorgularını verimli çalıştırır. Her komutun doğru kullanım senaryosu performansı doğrudan etkiler. Örnek komut karşılaştırması: - stats vs eventstats: stats sonuçları küçültür, eventstats orijinal olayları korur - transaction vs stats: transaction zaman bilgisini korur, stats sadece agregasyon yapar - join vs lookup: lookup daha hızlı, join daha esnek ama yavaş - map vs foreach: map alt sorgu çalıştırır, foreach field üzerinde döner

  • eventstats: Grup istatistiğini orijinal olay satırına ekler
  • streamstats: Zaman sıralı kümülatif istatistik hesaplar
  • transaction: İlişkili olayları zaman penceresinde gruplar
  • map: Her sonuç için dinamik alt sorgu çalıştırır

Ilk Cekirdek Kavram

Bu bölümde öne çıkan çekirdek kavram köşeli olarak verilir. SPL'de subsearch ile dinamik tehdit listesi oluşturulur. Son 1 saatte brute force yapan IP'leri gerçek zamanlı blocklist olarak kullanma: index=network earliest=-1h [search index=wineventlog earliest=-1h EventCode=4625 | stats count by src_ip | where count > 20 | rename src_ip as src_ip | fields src_ip] | stats sum(bytes_out) as total_bytes, dc(dst_ip) as unique_dst, dc(dst_port) as unique_ports by src_ip | eval threat_context="brute_force_source" | table src_ip, total_bytes, unique_dst, unique_ports, threat_context | sort -total_bytes Subsearch köşeli parantez içinde yazılır ve dış sorguya filtre olarak beslenir.

Arac, Komut veya Inceleme Akisi

SPL'de tstats komutu summary index veya tsidx üzerinden çalışarak milyarlarca olayı saniyeler içinde sorgular. Normal stats'tan 10-100x daha hızlıdır: | tstats summariesonly=false count AS conn_count, sum(All_Traffic.bytes_out) AS total_bytes, dc(All_Traffic.dest_ip) AS unique_dst, dc(All_Traffic.dest_port) AS unique_ports FROM datamodel=Network_Traffic BY All_Traffic.src_ip, All_Traffic.action SPAN=1h | eval bytes_per_conn=round(total_bytes/conn_count,0)

Bu bölümün pratik akışı şu sırayla ilerler:

  • | tstats summariesonly=false count AS conn_count, sum(All_Traffic.bytes_out) AS total_bytes FROM datamodel=Network_Traffic
  • WHERE All_Traffic.src_ip!="10.0.0.0/8"
  • BY All_Traffic.src_ip, All_Traffic.action SPAN=1h
  • | rename All_Traffic.* AS *
  • | where action="blocked" AND conn_count > 1000
  • | eval scanner_score=if(unique_dst>50 AND unique_ports>20,"HIGH","LOW")
  • | table src_ip, conn_count, total_bytes, unique_dst, unique_ports, scanner_score
  • | sort -conn_count

Kanit ve Bilesen Iliskileri

SPL lookup komutları dış veri kaynaklarını sorgularla birleştirir. Tehdit istihbaratı, asset envanteri ve kullanıcı dizini ile zenginleştirme yapılır. Örnek lookup tipleri: - inputlookup: CSV veya KV store'dan veri okur - outputlookup: sonuçları CSV'ye yazar - lookup: alan bazlı eşleştirme yapar - mlookup: çok alanlı eşleştirme, regex destekler

  • inputlookup: Threat intel CSV'sini sorguya besler
  • outputlookup: Baseline verisini kalıcı olarak CSV'ye kaydeder
  • lookup: src_ip'yi TI tablosundan zenginleştirir
  • KV Store lookup: Dinamik risk skorlarını gerçek zamanlı günceller

Ikincil Odak Noktasi

Bu bölümde öne çıkan çekirdek kavram (?P) olarak verilir. SPL'de regex ile gelişmiş log parse etme ve yapılandırılmamış veri analizi. Apache access log'undan User-Agent parmak izi çıkarma: index=weblogs earliest=-24h | rex field=_raw ""(?P\w+) (?P\S+) HTTP/[\d.]+" (?P\d{3}) (?P\d+) "[^"]*" "(?P[^"]+)"" | eval is_bot=if(match(user_agent,"bot|crawler|spider|scan|zgrab|masscan"),1,0) | eval is_vuln_scan=if(match(user_agent,"nikto|nessus|openvas|sqlmap|nuclei"),1,0) | eval ua_entropy=len(replace(user_agent,"[a-zA-Z0-9]",""))/len(user_agent) | stats count AS request_count, dc(uri) AS unique_uris, avg(ua_entropy) AS avg_entropy by src_ip, user_agent, is_bot, is_vuln_scan | where is_vuln_scan=1 OR (is_bot=0 AND avg_entropy>0.4 AND unique_uris>100) | table src_ip, user_agent, request_count, unique_uris, is_vuln_scan | sort -request_count

Operasyonel Dogrulama ve Raporlama

SPL'de makro ve saved search kullanımı tekrar eden sorguları modüler hale getirir. Güvenlik operasyonlarında standart avcılık sorgularını merkezi yönetme:

Makro tanımı (Settings > Advanced Search > Search Macros)

Makro adı: high_risk_auth_filter(1)

Argüman: threshold

İçerik:

index=wineventlog (EventCode=4624 OR EventCode=4625) | stats sum(eval(if(EventCode=4625,1,0))) AS failed, sum(eval(if(EventCode=4624,1,0))) AS success by user, src_ip | eval fail_rate=round(failed/(failed+success+0.001)*100,1) | where failed > $threshold$

Kullanım:

Bu bölümün pratik akışı şu sırayla ilerler:

  • high_risk_auth_filter(10)
  • | eval login_hour=strftime(_time,"%H")
  • | eval is_off_hours=if(login_hour<7 OR login_hour>20,1,0)
  • | where is_off_hours=1
  • | table user, src_ip, failed, success, fail_rate
  • | sort -failed

Cikti ve Kullanım Amaci

SPL performans optimizasyonu büyük veri setlerinde kritik öneme sahiptir. Yanlış yazılmış sorgular Splunk cluster'ı yavaşlatır ve SLA'yı etkiler. Örnek performans karşılaştırması: - Kötü: index=* | search src_ip=10.1.1.1 (tüm index taranır) - İyi: index=network src_ip=10.1.1.1 (hedefli arama) - Kötü: | where like(src_ip,"10.1.1%") (her satır işlenir) - İyi: index=network src_ip=10.1.1.* (index-time filtre)

  • Index ve sourcetype belirt: Bloom filter devreye girer, tarama alanı daralır
  • Filtreleri pipeline başına al: Az veri ileriki komutlara taşınır
  • tstats kullan: Summary index okunur, raw log işlenmez
  • Wildcard sona koy: src_ip=10.1.* geçerli, *10.1 geçersiz

Son Kavram ve Cikis

Bu bölümde öne çıkan çekirdek kavram collect olarak verilir. SPL'de corelation search ve notable event üretimi SIEM workflow'unun temel taşıdır. ES (Enterprise Security) modülünde risk based alerting: index=network earliest=-1h | tstats summariesonly=false count AS conn_count, sum(All_Traffic.bytes_out) AS total_bytes FROM datamodel=Network_Traffic BY All_Traffic.src_ip SPAN=5m | rename All_Traffic.src_ip AS src_ip | lookup asset_inventory ip AS src_ip OUTPUT department, criticality | eval risk_score=case( total_bytes>100000000 AND criticality="high", 90, total_bytes>50000000, 60, conn_count>10000, 40, true(), 10 ) | eval risk_object=src_ip | eval risk_object_type="system" | where risk_score >= 60 | collect index=risk_index collect komutu sonuçları risk_index'e yazarak ES Risk Based Alerting sistemini besler.

Bu Egitimden Ne Kazanirsiniz?

Bu icerik, Splunk SPL ile İleri Seviye Veri Sorgulama konusunu SOC L3 - İleri Tehdit Avcılığı - Veri Madenciliği ve Modelleme baglaminda parcali degil, butunlu bir ogrenme akisina donusturur. Yalnizca kavramlari ezberlemek yerine surec sirasini, bilesenler arasi iliskiyi ve hangi kanitin neden onemli oldugunu kavramayi hedefler.

Ozet

Bu ders kapsaminda one cikan basliklar: eventstats, streamstats, transaction, map, inputlookup, outputlookup, lookup, KV Store lookup, Index ve sourcetype belirt, Filtreleri pipeline başına al. Egitimin mantigi; once temel akis kurmak, sonra eslestirme ve kavram netlestirme yapmak, en sonda ise bulguyu operasyonel bir sonuca baglamaktir.