📝 Séance 2 : ELK Stack — Filebeat, Elasticsearch et dashboards Kibana

Bloc B2 3h30 C2.4.2 · M2.4
FormationBTS SIO option SISR — IRIS Mediaschool
Bloc / ModuleB2 · M2.4 — Compétence B2.4 / Lien B3.4
Durée3h30
PrérequisAvoir complété la Séance 1 — rsyslog et centralisation initiale

🎯 Introduction (10 min)

Dans un cas d'étude réel, une attaque n'a été découverte que trois mois après les faits parce que les logs étaient stockés uniquement localement et avaient été écrasés par la rotation quotidienne. Le manque de conservation centralisée et de corrélation a rendu le forensic impossible. Cette séance montre comment déployer un pipeline ELK (Elasticsearch, Logstash, Kibana) pour indexer, rechercher et alerter sur des événements critiques.

🎯 Objectifs de la séance

  • Décrire l'architecture ELK et les concepts essentiels d'Elasticsearch (cluster, index, shards primaires et réplicas, mapping dynamique vs explicite).
  • Déployer et configurer Filebeat pour collecter les logs locaux et modules (system, nginx) et ingérer dans Logstash / Elasticsearch.
  • Créer des dashboards Kibana simples (tentatives SSH échouées, top IPs, trafic Apache) et une règle d'alerte brute-force SSH.

📖 1. Architecture Elasticsearch et concepts (50 min)

Elasticsearch est un moteur de recherche distribué qui stocke des documents JSON dans des index. Un cluster est composé de nœuds ; chaque index est partitionné en shards primaires, et chaque shard primaire peut avoir des shards réplicas. Les réplicas assurent la haute disponibilité et la capacité de lecture.

Le mapping définit le type et la structure des champs : il peut être dynamique (Elasticsearch infère les types à l'ingestion) ou explicite (on pré-déclare le mapping pour maîtriser l'analyse et éviter des erreurs de typage).

Concepts clés

ConceptDescription
ClusterEnsemble de nœuds partageant les mêmes données et charge
IndexCollection de documents JSON avec un mapping commun
Shard primairePartition atomique de l'index, accepte les écritures
Shard réplicaCopie d'un primaire pour la HA et la lecture distribuée
Mapping dynamiqueTypes inférés automatiquement à l'ingestion
Mapping expliciteTypes pré-déclarés pour contrôler l'analyse

Commandes REST utiles

# État du cluster
curl -sS http://localhost:9200/_cluster/health?pretty

# Lister les indices
curl -sS http://localhost:9200/_cat/indices?v

# Rechercher dans un index
curl -sS -XPOST 'http://localhost:9200/logs-*/_search?pretty' \
  -H 'Content-Type: application/json' \
  -d '{"query":{"match_all":{}}}'
💡 Bonne pratique

Définir un Index Template pour que les nouveaux indices logs-%{+YYYY.MM.dd} aient des mappings et des politiques ILM appropriées dès leur création.


📖 2. Logstash : pipeline de parsing (40 min)

Logstash reçoit les données (input), les parse (filter) et les envoie (output). Le pipeline suit toujours cette architecture en trois étapes.

Pipeline minimal

input {
  beats { port => 5044 }
}
filter {
  # grok pour parser syslog / apache / sshd
  grok { match => { "message" => ["%{SYSLOGBASE}", "%{COMMONAPACHELOG}", "%{SSHLOG}"] } }
  date { match => [ "timestamp", "ISO8601" ] }
  mutate { remove_field => ["@version", "host"] }
}
output {
  elasticsearch { hosts => ["localhost:9200"] index => "logs-%{+YYYY.MM.dd}" }
  stdout { codec => rubydebug }
}

Grok patterns utiles

PatternUsage
%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:host} sshd\[%{NUMBER:pid}\]: %{GREEDYDATA:message}Logs sshd
%{COMBINEDAPACHELOG}Logs Apache Combined
%{SYSLOG5424PRI:pri}%{ISO8601:timestamp} %{HOSTNAME:hostname} %{DATA:program} %{GREEDYDATA:msg}Syslog RFC 5424
⚠️ Ordre des filtres

L'ordre des filtres est important : appliquer grok avant date, puis les enrichissements (geoip, user-agent) si nécessaire.


📖 3. Kibana : découverte, dashboards et alertes (40 min)

Kibana consomme les indices Elasticsearch via un index pattern (ex : logs-*). La vue Discover permet d'interroger avec KQL (Kibana Query Language) et d'isoler les événements.

Exemples de requêtes KQL

Requête KQLDescription
event.id:4625Échecs d'authentification Windows
program:sshd and message:FailedTentatives SSH échouées
http.response.status_code:5*Erreurs serveur HTTP 5xx

Widgets pour un dashboard opérationnel

  • Bar chart : auth failures per source IP (interval=1m)
  • Pie chart : % errors by program
  • Metric : total events / last 24h
  • Data table : Top 10 source IPs by event count

Alertes — Kibana Rules

On peut définir une règle de type threshold ou EQL. Exemple logique pour brute-force SSH :

  • Source : program:sshd AND message:Failed
  • Condition : WHEN count() IS ABOVE 5 FOR last 60 seconds GROUPED BY source.ip
  • Action : envoyer un e-mail et/ou créer un ticket via webhook.

📖 4. Filebeat : collecte et modules (20 min)

Filebeat est un agent léger pour collecter des fichiers log et disposer de modules prêts à l'emploi (system, nginx, apache).

Configuration minimale filebeat.yml

filebeat.inputs:
  - type: log
    enabled: true
    paths:
      - /var/log/*.log

filebeat.config.modules:
  path: "/etc/filebeat/modules.d/*.yml"
  reload.enabled: false

processors:
  - add_host_metadata: ~

output.elasticsearch:
  hosts: ["localhost:9200"]

Activer les modules system et nginx

sudo filebeat modules enable system nginx
sudo systemctl restart filebeat

Pour envoyer vers Logstash (parsing avancé) plutôt que directement vers Elasticsearch :

output.logstash:
  hosts: ["localhost:5044"]

📖 5. Règles de corrélation SIEM basiques (20 min)

Quelques règles simples et efficaces pour débuter :

Brute-force SSH

Si event_count > 5 échecs (message contenant "Failed" ou Event ID 4625) depuis la même adresse source en 60s, déclencher alerte et blocage temporaire.

FROM events WHERE event.type IN (auth_fail)
GROUP BY src_ip
WINDOW 60s
HAVING count(*) > 5
=> ALERT

Authentification hors horaires

Connexions réussies (Event ID 4624 / sshd Accepted) entre 22h00 et 06h00 → alerte de suspicion.

Scan réseau

Plus de 50 connexions provenant d'une même IP vers des ports différents en 30s → suspicion de scan de ports.

💡 Implémentation

Ces règles peuvent être implémentées dans Kibana (threshold/EQL) ou dans un moteur de corrélation dédié (ex : OpenSearch, Wazuh).


📖 6. Rétention, ILM et RGPD (10 min)

La gestion du cycle de vie des indices (ILM) permet d'automatiser le passage hot → warm → cold → delete.

Phase ILMDurée conseillée (PME)Description
hot14 joursÉcriture et recherche rapide
warm3 moisRecherche moins fréquente
coldJusqu'à 1 anArchivé pour conservation
deleteAprès 1 anSuppression (si non requis légalement)
⚠️ Conformité ANSSI / RGPD

L'ANSSI recommande une conservation minimale d'un an pour certains logs critiques. Pour la conformité RGPD, anonymiser ou tronquer les adresses IP après 6 mois (remplacement du dernier octet) et stocker uniquement les métadonnées nécessaires à la traçabilité.


💻 Travaux Pratiques (120 min)

Contexte

Vous déployez la pile ELK (version 8.x) sur debian-srv01 pour indexer les logs collectés lors de la séance 1. L'objectif est d'avoir une chaîne complète : Filebeat → Logstash → Elasticsearch (+Kibana) et des visualisations/alertes basiques.

Objectif

Installer Elasticsearch, Logstash, Kibana, Filebeat sur debian-srv01 ; configurer Filebeat avec modules system et nginx ; créer un dashboard Kibana montrant : tentatives SSH échouées (Event ID 4625), top IPs sources, volume trafic web Apache ; créer une règle d'alerte brute-force (>5 failures/minute).

Prérequis techniques

  • Accès root sur debian-srv01
  • Ports ouverts : 9200 (ES), 5601 (Kibana), 5044 (Logstash)

Étape 1 — Installer la stack ELK

# Clé GPG et dépôt Elastic 8.x
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
sudo apt-get install apt-transport-https -y
echo "deb https://artifacts.elastic.co/packages/8.x/apt stable main" \
  | sudo tee /etc/apt/sources.list.d/elastic-8.x.list
sudo apt update
sudo apt install elasticsearch logstash kibana filebeat -y

Étape 2 — Démarrer et vérifier Elasticsearch / Kibana

sudo systemctl enable --now elasticsearch
# Vérifier le statut du cluster
curl -sS http://localhost:9200/_cluster/health?pretty
sudo systemctl enable --now kibana

Étape 3 — Configurer le pipeline Logstash

Fichier /etc/logstash/conf.d/50-beats.conf :

input { beats { port => 5044 } }
filter {
  grok { match => { "message" => ["%{SYSLOGBASE}", "%{COMBINEDAPACHELOG}"] } }
  date { match => ["timestamp", "ISO8601"] }
}
output {
  elasticsearch { hosts => ["localhost:9200"] index => "logs-%{+YYYY.MM.dd}" }
}
sudo systemctl restart logstash

Étape 4 — Configurer Filebeat

Fichier /etc/filebeat/filebeat.yml :

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/*.log

filebeat.config.modules:
  path: "/etc/filebeat/modules.d/*.yml"

processors:
  - add_host_metadata: ~

output.logstash:
  hosts: ["localhost:5044"]
sudo filebeat modules enable system nginx
sudo filebeat setup        # charge les dashboards Kibana fournis
sudo systemctl enable --now filebeat

Étape 5 — Créer le dashboard Kibana

  1. Dans Kibana, créer un index pattern logs-*.
  2. Discover : rechercher event.id:4625 OR (program:sshd AND message:Failed) et sauvegarder la recherche.
  3. Visualize : créer un bar chart count by source.ip pour afficher les top IPs.
  4. Combiner les visualisations sur un dashboard : "SSH failures & Web traffic".

Étape 6 — Créer une règle d'alerte brute-force

Dans Kibana Detection Engine, créer une Threshold Rule :

  • Index pattern : logs-*
  • Query : program:sshd AND message:Failed
  • Condition : when count() > 5 over the last 1 minute grouped by source.ip
  • Action : email/webhook ou indexation dans alerts-*

Livrable attendu

  • Preuves d'installation (status units), captures d'écran du dashboard Kibana, lien vers la règle d'alerte configurée.
  • JSON exporté du dashboard et de la règle (pour archivage et restitution).

Critères de réussite

  • ☐ Elasticsearch et Kibana accessibles sur ports 9200 et 5601.
  • ☐ Filebeat envoie bien des événements à Logstash/Elasticsearch (vérifier les logs Filebeat et l'index logs-*).
  • ☐ Dashboard montre clairement : tentatives SSH failed (4625), top IP sources, volume web Apache.
  • ☐ Règle brute-force se déclenche lorsque >5 échecs/minute et génère une action.

🏁 Synthèse (10 min)

Cette séance vous a permis de transformer des fichiers log en documents indexables dans Elasticsearch, de définir des mappings et templates, et de concevoir des analyses visuelles et des règles d'alerte opérationnelles. La prochaine étape consiste à industrialiser la collecte (agents, certificats TLS) et à rédiger une politique de rétention conforme RGPD et aux recommandations ANSSI.

❓ Question de vérification

En une phrase, expliquez la différence entre un shard primaire et son replica, et pourquoi les replicas sont importants pour la disponibilité.


📝 Quiz gamifié — Auto-évaluation (5 questions)

  1. Q1 (QCM) : Quelle est la différence fondamentale entre un shard primaire et un shard replica dans Elasticsearch ?

  2. Q2 (QCM) : Dans le pipeline ELK, quel est le rôle principal de Filebeat ?

  3. Q3 (Vrai / Faux) : "La directive grok dans Logstash permet de parser des logs en champs structurés en utilisant des expressions régulières nommées."

  4. Q4 (QCM) : Quel langage de requête utilise Kibana par défaut dans la vue Discover pour filtrer les événements ?

  5. Q5 (QCM) : Dans une politique ILM Elasticsearch, que décrit la phase warm ?

Le plugin de quiz vous donnera un score et enregistrera vos points pour le leaderboard.