En este post os enseñaremos varios ejemplos de filtros para Logstash, son muy útiles para enviar datos a Kibana, no te olvides de añadir el correspondiente prospectors de Filebeats.
Para Syslog crearemos un fichero /etc/logstash/conf.d/10-syslog-filter.conf
filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
Para Auth crearemos un fichero /etc/logstash/conf.d/10-auth-filter.conf
filter {
if [type] == "authlog" {
grok {
match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} 🙁 %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][user][add][name]}, UID=%{NUMBER:[system][auth][user][add][uid]}, GID=%{NUMBER:[system][auth][user][add][gid]}, home=%{DATA:[system][auth][user][add][home]}, shell=%{DATA:[system][auth][user][add]}$",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][message]}"] }
pattern_definitions => {
"GREEDYMULTILINE"=> "(.|\n)*"
}
remove_field => "message"
}
date {
match => [ "[system][auth][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
geoip {
source => "[system][auth][ssh][ip]"
target => "[system][auth][ssh][geoip]"
}
}
}
Para Accesos Apache crearemos un fichero /etc/logstash/conf.d/11-apache-log.conf
filter {
if [type] == "apache-access" {
grok {
match => { "message" => ["%{IPORHOST:[apache2][access][remote_ip]} - %{DATA:[apache2][access][user_name]} \[%{HTTPDATE:[apache2][access][time]}\] \"%{WORD:[apache2][access][method]} %{DATA:[apache2][access][url]} HTTP/%{NUMBER:[apache2][access][http_version]}\" %{NUMBER:[apache2][access][response_code]} %{NUMBER:[apache2][access][body_sent][bytes]}( \"%{DATA:[apache2][access][referrer]}\")?( \"%{DATA:[apache2][access][agent]}\")?","%{IPORHOST:[apache2][access][remote_ip]} - %{DATA:[apache2][access][user_name]} \\[%{HTTPDATE:[apache2][access][time]}\\] \"-\" %{NUMBER:[apache2][access][response_code]} -" ] }
remove_field => "message"
}
geoip {
source => "clientip"
target => "geoip"
database => "/etc/logstash/GeoLite2-City.mmdb"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
add_tag => [ "apache-geoip" ]
}
mutate {
convert => ["response", "integer"]
convert => ["bytes", "integer"]
convert => ["responsetime", "float"]
add_field => [ "src_ip", "%{clientip}" ]
convert => [ "[geoip][coordinates]", "float"]
replace => [ "@source_host", "%{host}" ]
replace => [ "@message", "%{message}" ]
rename => [ "verb" , "method" ]
}
date {
match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
remove_field => [ "timestamp" ]
}
useragent {
source => "agent"
target => "useragent"
remove_field => ["agent"]
}
}
}
Para Errores Apache crearemos un fichero /etc/logstash/conf.d/11-apache-error.conf
filter {
if [type] == "apache-error" {
grok {
match => { "message" => ["\[%{APACHE_TIME:[apache2][error][timestamp]}\] \[%{LOGLEVEL:[apache2][error][level]}\]( \[client %{IPORHOST:[apache2][error][client]}\])? %{GREEDYDATA:[apache2][error][message]}",
"\[%{APACHE_TIME:[apache2][error][timestamp]}\] \[%{DATA:[apache2][error][module]}:%{LOGLEVEL:[apache2][error][level]}\] \[pid %{NUMBER:[apache2][error][pid]}(:tid %{NUMBER:[apache2][error][tid]})?\]( \[client %{IPORHOST:[apache2][error][client]}\])? %{GREEDYDATA:[apache2][error][message1]}" ] }
pattern_definitions => {
"APACHE_TIME" => "%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}"
}
remove_field => "message"
}
mutate {
rename => { "[apache2][error][message1]" => "[apache2][error][message]" }
}
date {
match => [ "[apache2][error][timestamp]", "EEE MMM dd H:m:s YYYY", "EEE MMM dd H:m:s.SSSSSS YYYY" ]
remove_field => "[apache2][error][timestamp]"
}
}
}
Para Nginx crearemos un fichero /etc/logstash/conf.d/12-nginx-log.conf
filter {
if [type] == "nginx-access" {
grok {
match => [ "message" , "%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}"]
overwrite => [ "message" ]
remove_tag => [ "beats_input_codec_plain_applied" ]
}
geoip {
source => "clientip"
target => "geoip"
database => "/etc/logstash/GeoLite2-City.mmdb"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
add_tag => [ "nginx-geoip" ]
}
mutate {
convert => ["response", "integer"]
convert => ["bytes", "integer"]
convert => ["responsetime", "float"]
add_field => [ "src_ip", "%{clientip}" ]
convert => [ "[geoip][coordinates]", "float"]
replace => [ "@source_host", "%{host}" ]
replace => [ "@message", "%{message}" ]
rename => [ "verb" , "method" ]
}
date {
match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
remove_field => [ "timestamp" ]
}
useragent {
source => "agent"
}
}
if [type] == "nginx-error" {
grok {
match => [
"(?\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[%{DATA:err_severity}\] (%{NUMBER:pid:int}#%{NUMBER}: \*%{NUMBER}|\*%{NUMBER}) %{DATA:err_message}(?:, client: (?%{IP}|%{HOSTNAME}))(?:, server: %{IPORHOST:server})(?:, request: %{QS:request})?(?:, host: %{QS:client_ip})?(?:, referrer: \"%{URI:referrer})?", "%{DATESTAMP:timestamp} \[%{DATA:err_severity}\] %{GREEDYDATA:err_message}"
]
remove_tag => [ "beats_input_codec_plain_applied", "_grokparsefailure" ]
}
}
}
Para MySQL crearemos un fichero /etc/logstash/conf.d/14-mysql-error.conf
filter {
if [type] == "mysql-error" {
grok {
match => { "message" => ["%{LOCALDATETIME:[mysql][error][timestamp]} (\[%{DATA:[mysql][error][level]}\] )?%{GREEDYDATA:[mysql][error][message]}",
"%{TIMESTAMP_ISO8601:[mysql][error][timestamp]} %{NUMBER:[mysql][error][thread_id]} \[%{DATA:[mysql][error][level]}\] %{GREEDYDATA:[mysql][error][message1]}",
"%{GREEDYDATA:[mysql][error][message2]}"] }
pattern_definitions => {
"LOCALDATETIME" => "[0-9]+ %{TIME}"
}
remove_field => "message"
}
mutate {
rename => { "[mysql][error][message1]" => "[mysql][error][message]" }
}
mutate {
rename => { "[mysql][error][message2]" => "[mysql][error][message]" }
}
date {
match => [ "[mysql][error][timestamp]", "ISO8601", "YYMMdd H:m:s" ]
remove_field => "[apache2][access][time]"
}
}
}
Para MySQL slow queries crearemos un fichero/etc/logstash/conf.d/14-mysql-slow.conf
filter {
if [type] == "mysql-slow" {
if [message] =~ "^# Time:.*$" {
drop {}
}
multiline {
pattern => "^# User@Host:.*$"
negate => true
what => "previous"
}
grok {
match => [
"message", "(?m)^# User@Host: %{GREEDYDATA:user}\[%{GREEDYDATA}\] @ \[%{IP:client_ip}\]\s*# Query_time: %{NUMBER:query_time:float} Lock_time: %{NUMBER:query_lock_time:float} Rows_sent: %{NUMBER:query_rows_sent:int} Rows_examined: %{NUMBER:query_rows_examined:int}\s*SET timestamp=%{NUMBER:log_timestamp};\s*%{GREEDYDATA:query}$",
"message", "(?m)^# User@Host: %{GREEDYDATA:user}\[%{GREEDYDATA}\] @ \[%{IP:client_ip}\]\s*# Query_time: %{NUMBER:query_time:float} Lock_time: %{NUMBER:query_lock_time:float} Rows_sent: %{NUMBER:query_rows_sent:int} Rows_examined: %{NUMBER:query_rows_examined:int}\s*use %{GREEDYDATA:database};\s*SET timestamp=%{NUMBER:log_timestamp};\s*%{GREEDYDATA:query}$",
"message", "(?m)^# User@Host: %{GREEDYDATA:user}\[%{GREEDYDATA}\] @ %{GREEDYDATA:client_ip} \[\]\s*# Query_time: %{NUMBER:query_time:float} Lock_time: %{NUMBER:query_lock_time:float} Rows_sent: %{NUMBER:query_rows_sent:int} Rows_examined: %{NUMBER:query_rows_examined:int}\s*SET timestamp=%{NUMBER:log_timestamp};\s*%{GREEDYDATA:query}$"
]
remove_tag => [ "beats_input_codec_plain_applied", "_grokparsefailure" ]
}
date {
match => ["log_timestamp", "UNIX"]
}
mutate {
remove_field => "log_timestamp"
}
}
}
Usar Geoip
Para que pueda geolocalizar IPs necesitamos GeoLite2-City.mmdb, para ello:
# cd /etc/logstash/
# wget http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.tar.gz
# tar -xzvf GeoLite2-City.tar.gz
# mv GeoLite2-City_20*/GeoLite2-City.mmdb ./
# systemctl restart logstash.service
Si no queremos usar Geolocalización comentar las lineas de los filtros:
# geoip {
# source => "clientip"
# target => "geoip"
# database => "/etc/logstash/GeoLite2-City.mmdb"
# add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
# add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
# add_tag => [ "nginx-geoip" ]
# }
Fuente: Bubbl ELK y Elastic