containerd.lua:
function containerd( tag, timestamp, record)
if(record["logtag"]~=nil)
then
timeStr = os.date("!*t", timestamp["sec"])
t = string.format("%4d-%02d-%02dT%02d:%02d:%02d.%sZ",
timeStr["year"], timeStr["month"], timeStr["day"],
timeStr["hour"], timeStr["min"], timeStr["sec"],
timestamp["nsec"]);
record["time"] = t;
record["log"] = record["message"];
record["message"] = nil;
return 1, timestamp, record
else
return 0,timestamp,record
end
end
fluent-bit.conf:
[Service]
Parsers_File parsers.conf
[Input]
Name systemd
Path /var/log/journal
DB /fluent-bit/tail/docker.db
DB.Sync Normal
Tag service.crio
Systemd_Filter _SYSTEMD_UNIT=crio.service
[Input]
Name systemd
Path /var/log/journal
DB /fluent-bit/tail/docker.db
DB.Sync Normal
Tag service.docker
Systemd_Filter _SYSTEMD_UNIT=docker.service
[Input]
Name systemd
Path /var/log/journal
DB /fluent-bit/tail/kubelet.db
DB.Sync Normal
Tag service.kubelet
Systemd_Filter _SYSTEMD_UNIT=kubelet.service
[Input]
Name tail
Path /var/log/containers/*.log
Exclude_Path /var/log/containers/*_kubesphere-logging-system_events-exporter*.log,/var/log/containers/kube-auditing-webhook*_kubesphere-logging-system_kube-auditing-webhook*.log
Refresh_Interval 10
Skip_Long_Lines true
DB /fluent-bit/tail/pos.db
DB.Sync Normal
Mem_Buf_Limit 5MB
Parser cri
Tag kube.*
[Input]
Name tail
Path /var/log/containers/kube-auditing-webhook*_kubesphere-logging-system_kube-auditing-webhook*.log
Refresh_Interval 10
Skip_Long_Lines true
DB /fluent-bit/tail/pos-auditing.db
DB.Sync Normal
Mem_Buf_Limit 5MB
Parser cri
Tag kube_auditing
[Input]
Name tail
Path /var/log/containers/*_kubesphere-logging-system_events-exporter*.log
Refresh_Interval 10
Skip_Long_Lines true
DB /fluent-bit/tail/pos-events.db
DB.Sync Normal
Mem_Buf_Limit 5MB
Parser cri
Tag kube_events
[Filter]
Name lua
Match kube.*
script /fluent-bit/config/containerd.lua
call containerd
time_as_table true
[Filter]
Name parser
Match kube_auditing
Key_Name message
Parser json
[Filter]
Name modify
Match kube_auditing
Condition Key_does_not_exist AuditID
Add ignore true
[Filter]
Name grep
Match kube_auditing
Exclude ignore true
[Filter]
Name parser
Match kube_events
Key_Name message
Parser json
[Filter]
Name kubernetes
Match kube.*
Kube_URL https://kubernetes.default.svc:443
Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token
Labels false
Annotations false
[Filter]
Name nest
Match kube.*
Operation lift
Nested_under kubernetes
Add_prefix kubernetes_
[Filter]
Name modify
Match kube.*
Remove stream
Remove kubernetes_pod_id
Remove kubernetes_host
Remove kubernetes_container_hash
[Filter]
Name nest
Match kube.*
Operation nest
Wildcard kubernetes_*
Nest_under kubernetes
Remove_prefix kubernetes_
[Filter]
Name lua
Match service.*
script /fluent-bit/config/systemd.lua
call add_time
time_as_table true
[Output]
Name es
Match_Regex (?:kube|service)\.(.*)
Host 10.33.10.17
Port 9200
Logstash_Format true
Logstash_Prefix ks-logstash-log
Time_Key @timestamp
Generate_ID true
Trace_Error true
[Output]
Name es
Match kube_auditing
Host 10.33.10.17
Port 9200
Logstash_Format true
Logstash_Prefix ks-logstash-auditing
Generate_ID true
[Output]
Name es
Match kube_events
Host 10.33.10.17
Port 9200
Logstash_Format true
Logstash_Prefix ks-logstash-events
Generate_ID true
systemd.lua:
function add_time(tag, timestamp, record)
new_record = {}
timeStr = os.date("!*t", timestamp["sec"])
t = string.format("%4d-%02d-%02dT%02d:%02d:%02d.%sZ",
timeStr["year"], timeStr["month"], timeStr["day"],
timeStr["hour"], timeStr["min"], timeStr["sec"],
timestamp["nsec"])
kubernetes = {}
kubernetes["pod_name"] = record["_HOSTNAME"]
kubernetes["container_name"] = record["SYSLOG_IDENTIFIER"]
kubernetes["namespace_name"] = "kube-system"
new_record["time"] = t
new_record["log"] = record["MESSAGE"]
new_record["kubernetes"] = kubernetes
return 1, timestamp, new_record
end