📝 Séance 2 : ELK Stack — Filebeat, Elasticsearch et dashboards Kibana
| Formation | BTS SIO option SISR — IRIS Mediaschool |
|---|---|
| Bloc / Module | B2 · M2.4 — Compétence B2.4 / Lien B3.4 |
| Durée | 3h30 |
| Prérequis | Avoir complété la Séance 1 — rsyslog et centralisation initiale |
🎯 Introduction (10 min)
Dans un cas d'étude réel, une attaque n'a été découverte que trois mois après les faits parce que les logs étaient stockés uniquement localement et avaient été écrasés par la rotation quotidienne. Le manque de conservation centralisée et de corrélation a rendu le forensic impossible. Cette séance montre comment déployer un pipeline ELK (Elasticsearch, Logstash, Kibana) pour indexer, rechercher et alerter sur des événements critiques.
🎯 Objectifs de la séance
- Décrire l'architecture ELK et les concepts essentiels d'Elasticsearch (cluster, index, shards primaires et réplicas, mapping dynamique vs explicite).
- Déployer et configurer Filebeat pour collecter les logs locaux et modules (system, nginx) et ingérer dans Logstash / Elasticsearch.
- Créer des dashboards Kibana simples (tentatives SSH échouées, top IPs, trafic Apache) et une règle d'alerte brute-force SSH.
📖 1. Architecture Elasticsearch et concepts (50 min)
Elasticsearch est un moteur de recherche distribué qui stocke des documents JSON dans des index. Un cluster est composé de nœuds ; chaque index est partitionné en shards primaires, et chaque shard primaire peut avoir des shards réplicas. Les réplicas assurent la haute disponibilité et la capacité de lecture.
Le mapping définit le type et la structure des champs : il peut être dynamique (Elasticsearch infère les types à l'ingestion) ou explicite (on pré-déclare le mapping pour maîtriser l'analyse et éviter des erreurs de typage).
Concepts clés
| Concept | Description |
|---|---|
| Cluster | Ensemble de nœuds partageant les mêmes données et charge |
| Index | Collection de documents JSON avec un mapping commun |
| Shard primaire | Partition atomique de l'index, accepte les écritures |
| Shard réplica | Copie d'un primaire pour la HA et la lecture distribuée |
| Mapping dynamique | Types inférés automatiquement à l'ingestion |
| Mapping explicite | Types pré-déclarés pour contrôler l'analyse |
Commandes REST utiles
# État du cluster
curl -sS http://localhost:9200/_cluster/health?pretty
# Lister les indices
curl -sS http://localhost:9200/_cat/indices?v
# Rechercher dans un index
curl -sS -XPOST 'http://localhost:9200/logs-*/_search?pretty' \
-H 'Content-Type: application/json' \
-d '{"query":{"match_all":{}}}'
Définir un Index Template pour que les nouveaux indices logs-%{+YYYY.MM.dd} aient des mappings et des politiques ILM appropriées dès leur création.
📖 2. Logstash : pipeline de parsing (40 min)
Logstash reçoit les données (input), les parse (filter) et les envoie (output). Le pipeline suit toujours cette architecture en trois étapes.
Pipeline minimal
input {
beats { port => 5044 }
}
filter {
# grok pour parser syslog / apache / sshd
grok { match => { "message" => ["%{SYSLOGBASE}", "%{COMMONAPACHELOG}", "%{SSHLOG}"] } }
date { match => [ "timestamp", "ISO8601" ] }
mutate { remove_field => ["@version", "host"] }
}
output {
elasticsearch { hosts => ["localhost:9200"] index => "logs-%{+YYYY.MM.dd}" }
stdout { codec => rubydebug }
}
Grok patterns utiles
| Pattern | Usage |
|---|---|
%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:host} sshd\[%{NUMBER:pid}\]: %{GREEDYDATA:message} | Logs sshd |
%{COMBINEDAPACHELOG} | Logs Apache Combined |
%{SYSLOG5424PRI:pri}%{ISO8601:timestamp} %{HOSTNAME:hostname} %{DATA:program} %{GREEDYDATA:msg} | Syslog RFC 5424 |
L'ordre des filtres est important : appliquer grok avant date, puis les enrichissements (geoip, user-agent) si nécessaire.
📖 3. Kibana : découverte, dashboards et alertes (40 min)
Kibana consomme les indices Elasticsearch via un index pattern (ex : logs-*). La vue Discover permet d'interroger avec KQL (Kibana Query Language) et d'isoler les événements.
Exemples de requêtes KQL
| Requête KQL | Description |
|---|---|
event.id:4625 | Échecs d'authentification Windows |
program:sshd and message:Failed | Tentatives SSH échouées |
http.response.status_code:5* | Erreurs serveur HTTP 5xx |
Widgets pour un dashboard opérationnel
- Bar chart :
auth failures per source IP (interval=1m) - Pie chart :
% errors by program - Metric :
total events / last 24h - Data table :
Top 10 source IPs by event count
Alertes — Kibana Rules
On peut définir une règle de type threshold ou EQL. Exemple logique pour brute-force SSH :
- Source :
program:sshd AND message:Failed - Condition :
WHEN count() IS ABOVE 5 FOR last 60 seconds GROUPED BY source.ip - Action : envoyer un e-mail et/ou créer un ticket via webhook.
📖 4. Filebeat : collecte et modules (20 min)
Filebeat est un agent léger pour collecter des fichiers log et disposer de modules prêts à l'emploi (system, nginx, apache).
Configuration minimale filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/*.log
filebeat.config.modules:
path: "/etc/filebeat/modules.d/*.yml"
reload.enabled: false
processors:
- add_host_metadata: ~
output.elasticsearch:
hosts: ["localhost:9200"]
Activer les modules system et nginx
sudo filebeat modules enable system nginx
sudo systemctl restart filebeat
Pour envoyer vers Logstash (parsing avancé) plutôt que directement vers Elasticsearch :
output.logstash:
hosts: ["localhost:5044"]
📖 5. Règles de corrélation SIEM basiques (20 min)
Quelques règles simples et efficaces pour débuter :
Brute-force SSH
Si event_count > 5 échecs (message contenant "Failed" ou Event ID 4625) depuis la même adresse source en 60s, déclencher alerte et blocage temporaire.
FROM events WHERE event.type IN (auth_fail)
GROUP BY src_ip
WINDOW 60s
HAVING count(*) > 5
=> ALERT
Authentification hors horaires
Connexions réussies (Event ID 4624 / sshd Accepted) entre 22h00 et 06h00 → alerte de suspicion.
Scan réseau
Plus de 50 connexions provenant d'une même IP vers des ports différents en 30s → suspicion de scan de ports.
Ces règles peuvent être implémentées dans Kibana (threshold/EQL) ou dans un moteur de corrélation dédié (ex : OpenSearch, Wazuh).
📖 6. Rétention, ILM et RGPD (10 min)
La gestion du cycle de vie des indices (ILM) permet d'automatiser le passage hot → warm → cold → delete.
| Phase ILM | Durée conseillée (PME) | Description |
|---|---|---|
| hot | 14 jours | Écriture et recherche rapide |
| warm | 3 mois | Recherche moins fréquente |
| cold | Jusqu'à 1 an | Archivé pour conservation |
| delete | Après 1 an | Suppression (si non requis légalement) |
L'ANSSI recommande une conservation minimale d'un an pour certains logs critiques. Pour la conformité RGPD, anonymiser ou tronquer les adresses IP après 6 mois (remplacement du dernier octet) et stocker uniquement les métadonnées nécessaires à la traçabilité.
💻 Travaux Pratiques (120 min)
Contexte
Vous déployez la pile ELK (version 8.x) sur debian-srv01 pour indexer les logs collectés lors de la séance 1. L'objectif est d'avoir une chaîne complète : Filebeat → Logstash → Elasticsearch (+Kibana) et des visualisations/alertes basiques.
Objectif
Installer Elasticsearch, Logstash, Kibana, Filebeat sur debian-srv01 ; configurer Filebeat avec modules system et nginx ; créer un dashboard Kibana montrant : tentatives SSH échouées (Event ID 4625), top IPs sources, volume trafic web Apache ; créer une règle d'alerte brute-force (>5 failures/minute).
Prérequis techniques
- Accès root sur
debian-srv01 - Ports ouverts :
9200(ES),5601(Kibana),5044(Logstash)
Étape 1 — Installer la stack ELK
# Clé GPG et dépôt Elastic 8.x
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
sudo apt-get install apt-transport-https -y
echo "deb https://artifacts.elastic.co/packages/8.x/apt stable main" \
| sudo tee /etc/apt/sources.list.d/elastic-8.x.list
sudo apt update
sudo apt install elasticsearch logstash kibana filebeat -y
Étape 2 — Démarrer et vérifier Elasticsearch / Kibana
sudo systemctl enable --now elasticsearch
# Vérifier le statut du cluster
curl -sS http://localhost:9200/_cluster/health?pretty
sudo systemctl enable --now kibana
Étape 3 — Configurer le pipeline Logstash
Fichier /etc/logstash/conf.d/50-beats.conf :
input { beats { port => 5044 } }
filter {
grok { match => { "message" => ["%{SYSLOGBASE}", "%{COMBINEDAPACHELOG}"] } }
date { match => ["timestamp", "ISO8601"] }
}
output {
elasticsearch { hosts => ["localhost:9200"] index => "logs-%{+YYYY.MM.dd}" }
}
sudo systemctl restart logstash
Étape 4 — Configurer Filebeat
Fichier /etc/filebeat/filebeat.yml :
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/*.log
filebeat.config.modules:
path: "/etc/filebeat/modules.d/*.yml"
processors:
- add_host_metadata: ~
output.logstash:
hosts: ["localhost:5044"]
sudo filebeat modules enable system nginx
sudo filebeat setup # charge les dashboards Kibana fournis
sudo systemctl enable --now filebeat
Étape 5 — Créer le dashboard Kibana
- Dans Kibana, créer un index pattern
logs-*. - Discover : rechercher
event.id:4625 OR (program:sshd AND message:Failed)et sauvegarder la recherche. - Visualize : créer un bar chart
count by source.ippour afficher les top IPs. - Combiner les visualisations sur un dashboard : "SSH failures & Web traffic".
Étape 6 — Créer une règle d'alerte brute-force
Dans Kibana Detection Engine, créer une Threshold Rule :
- Index pattern :
logs-* - Query :
program:sshd AND message:Failed - Condition : when
count()> 5 over the last 1 minute grouped bysource.ip - Action : email/webhook ou indexation dans
alerts-*
Livrable attendu
- Preuves d'installation (status units), captures d'écran du dashboard Kibana, lien vers la règle d'alerte configurée.
- JSON exporté du dashboard et de la règle (pour archivage et restitution).
Critères de réussite
- ☐ Elasticsearch et Kibana accessibles sur ports
9200et5601. - ☐ Filebeat envoie bien des événements à Logstash/Elasticsearch (vérifier les logs Filebeat et l'index
logs-*). - ☐ Dashboard montre clairement : tentatives SSH failed (4625), top IP sources, volume web Apache.
- ☐ Règle brute-force se déclenche lorsque >5 échecs/minute et génère une action.
🏁 Synthèse (10 min)
Cette séance vous a permis de transformer des fichiers log en documents indexables dans Elasticsearch, de définir des mappings et templates, et de concevoir des analyses visuelles et des règles d'alerte opérationnelles. La prochaine étape consiste à industrialiser la collecte (agents, certificats TLS) et à rédiger une politique de rétention conforme RGPD et aux recommandations ANSSI.
En une phrase, expliquez la différence entre un shard primaire et son replica, et pourquoi les replicas sont importants pour la disponibilité.
📝 Quiz gamifié — Auto-évaluation (5 questions)
-
Q1 (QCM) : Quelle est la différence fondamentale entre un shard primaire et un shard replica dans Elasticsearch ?
-
Q2 (QCM) : Dans le pipeline ELK, quel est le rôle principal de Filebeat ?
-
Q3 (Vrai / Faux) : "La directive
grokdans Logstash permet de parser des logs en champs structurés en utilisant des expressions régulières nommées." -
Q4 (QCM) : Quel langage de requête utilise Kibana par défaut dans la vue Discover pour filtrer les événements ?
-
Q5 (QCM) : Dans une politique ILM Elasticsearch, que décrit la phase warm ?
Le plugin de quiz vous donnera un score et enregistrera vos points pour le leaderboard.
