ELK

ELK

ELK

install

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
cat <<EOF > /etc/yum.repos.d/elastic.repo
[logstash-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF

get cos to logstash and es by file

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
input {
  s3 {
    "access_key_id" => "xxx"
    "secret_access_key" => "xxx"
    "endpoint" => "https://cos.ap-xxx.myqcloud.com"
    "bucket" => "xxx"
    "region" => "ap-xxx"
    interval => 120
  }
}

filter{
  mutate {
    add_field => {
        "file" => "%{[@metadata][s3][key]}"
    }
  }
  dissect {
      mapping => {
        "file" => "%{EnvName}/%{PublicIpv4}-%{PackageVersion}/%{FileName}.log"
      }
 }
}

output {
  elasticsearch {
    hosts => ["http://xxx:9200"]
    index => "llc-test"
    #document_id => "%{id}"
    codec => rubydebug {
      metadata => true
    }
 }
}

logstash

plugins

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

logstash-plugin --help
Using bundled JDK: /usr/share/logstash/jdk
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Usage:
    bin/logstash-plugin [OPTIONS] SUBCOMMAND [ARG] ...

Parameters:
    SUBCOMMAND                    subcommand
    [ARG] ...                     subcommand arguments

Subcommands:
    list                          List all installed Logstash plugins
    install                       Install a Logstash plugin
    remove                        Remove a Logstash plugin
    update                        Update a plugin
    pack                          Package currently installed plugins, Deprecated: Please use prepare-offline-pack instead
    unpack                        Unpack packaged plugins, Deprecated: Please use prepare-offline-pack instead
    generate                      Create the foundation for a new plugin
    uninstall                     Uninstall a plugin. Deprecated: Please use remove instead
    prepare-offline-pack          Create an archive of specified plugins to use for offline installation

Options:
    -h, --help                    print help

yum -y install libmaxminddb-devel-1.2.0-6.el7.x86_64

debug

https://www.elastic.co/guide/en/logstash/current/running-logstash-command-line.html

通过logstash -f test.conf测试 可能会出现内存不够的情况

grok

grok官方文档 https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html

grok和dessect都可以用于过滤和匹配,对于重复性多的请用dessect grok数据类型

注意

1
2
3
4
5
grok {
    match => {
      "xxx" => "YOUR PATTERN"
    }
}

一个例子,匹配supervsior日志

1
2
3
4
5
grok {
  match => { 
     message => "%{TIMESTAMP_ISO8601:logdate} level=%{LOGLEVEL:loglevel} msg=\"%{GREEDYDATA:logmessage}\""
  }
}

dessect

csv

geiop

free geoip

https://mailfud.org/geoip-legacy/

https://github.com/P3TERX/GeoLite.mmdb/releases/tag/2022.06.10

http x-forwarded-for to real ip

1
2
3
4
5
filter {
    mutate {
        replace => { "host" => "%{[@metadata][input][http][request][headers][x-forwarded-for]}" } 
    }
}

Kibana

https://www.elastic.co/guide/cn/kibana/current/index.html

KQL

官方文档建议通读 https://www.elastic.co/guide/en/kibana/current/kuery-query.html

首先kql 不能进行排序和聚合仅能用于过滤数据 ,kql 不支持正则

条件查询
1
2
3
my_field:1 2 3 # 满足my_field是1或者2或者3的
http.response.status_code:400 401 404 # 返回码是400 401 404的
http.response.body.content.text:"liuliancao.com" #满足 内容是liuliancao.com的
与或非
1
2
3
4
5
my_field:1 or test_field:2 # my_field包含1或者test_field包含2
my_field:1 and test_field:2 # my_field包含1且test_field包含2
my_field:(1 or 2) # my_field里面包含1或者2的
my_field:1 and not test_field:3 # my_field包含1且test_field不包含3
tags:(success and info and security) # tags包含success, info和security
范围查询

>=, > , < , <=

1
my_field > 80 # my_field比80大
时间查询
1
2
3
@timestamp < "2022-07-02T08:48:45"
@timestamp < "2022-07"
@timestamp < now-1d
检查字段是否存在

存在 your_field:*

1
blog:* # 存在blog字段
通配符匹配
1
2
blog:*.liuliancao.com # blog匹配*.liuliancao.com
blog_*: liuliancao.com # 复核blog_*字段匹配liuliancao.com
嵌套字段的匹配

假设有一个字段是

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
{
  "items":[
      {
	  "name": "xiaoming",
	  "score": 59
      },
      {
	  "name": "xiaohei",
	  "score": 61
      },
      {
	  "name": "xiaobai",
	  "score": 63
      }
  ]
}

匹配score>=60的name

1
items: (score >= 60)

save query to csv

share your query first, click generate csv

management - reports

download

canvas

https://www.elastic.co/guide/en/kibana/current/canvas.html 可以绘制更好看自定义级别更高的图表

export visualizations

management –> saved objects –> export

management –> saved objects –> import

FAQ

重新映射timestamp

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
grok {
    match => { 
       message => "time=\"%{TIMESTAMP_ISO8601:logdate}\" level=%{LOGLEVEL:loglevel} msg=\"%{GREEDYDATA:logmessage}\""
    }
  }
  date {
    match => [ "logdate", "ISO8601" ]
    target => "@timestamp" 
    timezone => "Asia/Shanghai"
  }

logstash里面的timestamp

https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html

es每日index

1
index => "test-%{+YYYY.MM.dd}"

丢掉部分无用message

1
2
3
4
# in filter
  if [message] !~ /xxx/ {
	 drop {}
   }

打开xpack

https://www.elastic.co/cn/what-is/open-x-pack

https://linuxhint.com/enable-xpack-elasticsearch/

自动追加timestamp

https://stackoverflow.com/questions/17136138/how-to-make-elasticsearch-add-the-timestamp-field-to-every-document-in-all-indic

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
PUT _ingest/pipeline/indexed_at
{
  "description": "Adds indexed_at timestamp to documents",
  "processors": [
    {
      "set": {
        "field": "_source.indexed_at",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ]
}
1
2
3
4
5

PUT ms-test/_settings
{
  "index.default_pipeline": "indexed_at"
}

多个elasticsearch index拼接成一个index

1
2
3
4
5
6
7
8
9
POST _reindex
{
  "source": {
    "index": "xxx-2023.05.*"
  },
  "dest": {
    "index": "xxx-archive-2023.05"
  }
}

把shell带换行的内容带到es里面去

假设是一个traceroute的结果

1
2
3
4
5
6
timeout traceroute www.baidu.com > ./traceroute.out
sed -i 's/\n/\\n/g' ./traceroute.out
traceroute_result="$(cat ./traceroute.out |tr '\n' '%' |sed 's/\%/\\n/g')"
curl -XPOST -H "Content-Type: application/json" -u $ES_AUTH --data-binary @- $ES_SERVER/traceroute-statistics/doc
{"region": "$REGION","traceroute": "$traceroute_result", "protocol": "$PROBE_TYPE", "host": "$PROBE_HOST", "port": "$PROBE_PORT","date":"$(date -u +%Y-%m-%dT%H:%M:%SZ)"}
EOF

主要任务是主要是把换行改成\n字符串就好了

setting 'filebeat.prospectors' has been removed

change to filebeat.inputs