EK + filebeat 平台搭建及使用
Contents
ES
依赖: >=JDK8
下载完直接解压即可.
开启外网IP访问
vim config/elasticsearch.yml
修改 network.host 为
network.host: 0.0.0.0
一般不直接使用外网配置, 可以使用 nginx 代理, 然后在Nginx 加个简单的认证配置即可. 建议直接写明局域网的IP地址, 然后通过 nginx 代理即可.
访问地址: IP:9200
nginx 代理 + 认证
nginx 配置
location / {
auth_basic "欢迎来到监控中心 ^_^";
auth_basic_user_file /home/company/nginx/nginx-1.12.2/conf/htpasswd;
proxy_pass http://10.161.29.39:5601;
proxy_redirect off;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
client_max_body_size 200m;
client_body_buffer_size 512k;
proxy_connect_timeout 90;
proxy_send_timeout 90;
proxy_read_timeout 90;
proxy_buffer_size 256k;
proxy_buffers 4 256k;
proxy_busy_buffers_size 512k;
proxy_temp_file_write_size 512k;
}
生成密码:
openssl passwd -crypt helloworld
Warning: truncating password to 8 characters
sOX8YFAkZbi7M
yourusername:sOX8YFAkZbi7M
reload 完 nginx 后, 就可以用 用户名 yourusername
, 密码为 helloworld
来登录了
启动报错
ERROR: [2] bootstrap checks failed
[1]: max file descriptors [65535] for elasticsearch process is too low, increase to at least [65536]
[2]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
解决:
sudo vim /etc/security/limits.conf
修改 65535 -> 65536 (或更大)
sudo vim /etc/sysctl.conf
添加或修改
vm.max_map_count=262144
然后查看是否生效:
sudo sysctl -p
然后退出 ssh 再重新登录
Kibana
下载 直接解压即可使用.
主要是配置好 ES 的地址就好. (建议与ES一样, 绑定本机局域网地址, 然后通过 nginx 代理访问)
配置
配置
要监听的地址
server.host: "10.161.29.39"
要修改为指向ES的主机
elasticsearch.url: "http://10.161.29.39:9200"
Filebeat
下载
https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-installation.html
修改配置文件
为不同的类型日志, 加上不同的标识
添加个自定义的字段:
- type: log
enabled: true
paths:
- /home/company/nginx/nginx-1.9.3-dsp/logs/*.log
exclude_lines: [' 200 ']
#include_lines: ['^ERR', '^WARN']
#exclude_files: ['.gz$']
fields:
modules: ad-monitor
其中 fields
下面的是根据自己需求, 来为该 log 的数据添加额外的字段(比如, 用来标识这某类日志, 方便在 es 里根据该字段来进行搜索)
include
表示日志行(按行为单位)包含该字符串的数据.
exclude
表示日志行不能包含该字符串的数据.
它们都支持正则.比如要查询 [ERROR]
这种 java logback 日志的数据. 可以这样子:
include: ['\[ERROR\]']
注意, 是用单引号来括住正则表达式的.
配置不同的索引名
output.elasticsearch:
hosts: ["http://localhost:9200"]
index: "filebeat-%{[beat.version]}-%{+yyyy.MM.dd}"
启动
sudo ./filebeat -e -c filebeat.yml
整合
启动完以上三个系统, 就应该可以从 Kibana 里(直接访问Kibana的服务器地址URL) 看到日志被不断地收集进来了.
创建仪表盘
步骤如下
- 先在 Discover 里创建一个搜索语句, 然后保存该搜索语句(类似 DB 的创建视图)
- 然后在 Visualize 里, 创建一个视图, 然后视图里选择 步骤1 所保存的搜索语句
- 再次在 Visualize 里, 保存好该视图的配置(X, Y轴等的设置信息)
- 最后, 在 Dashboard 里, 创建一个仪表盘, 然后编辑 -> 添加视图, 选择 步骤2,3 所保存好的视图名即可.
报警
由于官方的插件 x pack 需要购买许可证. 所以…你懂的.
现在的开源项目的方案, 又没跟上最新版本的 ELK , 所以想到的一个解决办法就是, 使用 python + ES 查询 + 条数 来进行预警(使用企业微信). 代码如下, 这个可根据自己的实际需求来修改.
微信发送消息代码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import requests
corpid = "你的corpid"
corpsecret = "你的企业应用的corpsecret"
agentid = 你的部门ID
token_url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={}&corpsecret={}"
send_msg_url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}"
formated_token_url = token_url.format(corpid, corpsecret)
def get_token():
r = requests.get(formated_token_url)
return r.json()['access_token']
def send_msg(msg, token):
formated_send_msg_url = send_msg_url.format(token)
resp = requests.post(formated_send_msg_url, json={
"touser": "@all",
"toparty": "",
"totag": "",
"msgtype": "text",
"agentid": agentid,
"text": {
"content": msg
},
"safe": 0
})
print("resp")
print(resp.json())
实际监控代码
# -*- coding: utf-8 -*-
from elasticsearch import Elasticsearch
import wx
import schedule
import time
import functools
es = Elasticsearch(['http://你的ES地址:9200'])
# 阀值
ProblemThreshold = 3
web_nginx_prefix = "/home/company/nginx/nginx-1.9.3-dsp/logs/"
ad_nginx_log_paths = ('ad-monitor.log', 'ad-click.log', 'ad-win-request.log')
dsp_nginx_log_paths = ('biz-monitor.log', 'biz-click.log', 'dsp-win-request.log')
nginx_log_paths = ad_nginx_log_paths + dsp_nginx_log_paths
# schedules 或 web tomcat 这里是 filebeat 的自定义字段 "fields.level = ad-sch 或 fields.level = dsp-sch"
ad_sch_id = "ad-sch"
dsp_sch_id = "dsp-sch"
ad_web_id = "ad-web"
dsp_web_id = "dsp-web"
tomcat_ids = [ad_sch_id, ad_web_id, dsp_sch_id, dsp_web_id]
def catch_exceptions(cancel_on_failure=False):
def actual_decorator(job_func):
@functools.wraps(job_func)
def wrapper(*args, **kwargs):
try:
print('LOG: Running job "%s"' % job_func.__name__)
result = job_func(*args, **kwargs)
print('LOG: Job "%s" completed' % job_func.__name__)
return result
except:
import traceback
print(traceback.format_exc())
if cancel_on_failure:
return schedule.CancelJob
return wrapper
return actual_decorator
# unit 参考 ES 的语法, 单位为分钟. 表示最近 unit 分钟内, probleam_threshold 表示有问题的数量
# https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#date-math
@catch_exceptions(cancel_on_failure=True)
def check(unit="1m", problem_threshold=50):
wx_token = wx.get_token()
for path in nginx_log_paths:
resp = es.search(index="filebeat-*", body={
"from": 0,
"size": 0,
"query": {
"bool": {
"must": [
{
"query_string": {
"query": "message:(-200) AND source:(\"" + web_nginx_prefix + path + "\")",
"analyze_wildcard": True,
"default_field": "*"
}
},
{
"range": {
"@timestamp": {
"gte": "now-" + unit
}
}
}
]
}
}
})
# 有问题的数量
problem_count = resp['hits']['total']
if problem_count > problem_threshold:
msg = "[ERROR] {} => {} 内有超过 {} 个异常(非200)数据.阀值为 {}".format(path, unit, problem_count, problem_threshold)
wx.send_msg(msg, wx_token)
# else:
# msg = "{} => {} 分钟内数据正常. 阀值为 {}".format(path, unit, probleam_threshold)
# wx.send_msg(msg, wx_token)
@catch_exceptions(cancel_on_failure=True)
def report(unit):
wx_token = wx.get_token()
for path in nginx_log_paths:
resp = es.search(index="filebeat-*", body={
"from": 0,
"size": 0,
"query": {
"bool": {
"must": [
{
"query_string": {
"query": "message:(-200) AND source:(\"" + web_nginx_prefix + path + "\")",
"analyze_wildcard": True,
"default_field": "*"
}
},
{
"range": {
"@timestamp": {
"gte": "now-" + unit
}
}
}
]
}
}
})
# 有问题的数量
problem_count = resp['hits']['total']
wx.send_msg("[INFO] {} 最近 {} 内, 一共有 {} 个异常数据".format(path, unit, problem_count), wx_token)
@catch_exceptions(cancel_on_failure=True)
def tomcat_check(unit="1m", problem_threshold=50):
wx_token = wx.get_token()
for tomcatid in tomcat_ids:
resp = es.search(index="filebeat-*", body={
"from": 0,
"size": 0,
"query": {
"bool": {
"must": [
{
"query_string": {
"query": "fields.level:(\"" + tomcatid + "\")",
"analyze_wildcard": True,
"default_field": "*"
}
},
{
"range": {
"@timestamp": {
"gte": "now-" + unit
}
}
}
]
}
}
})
# 有问题的数量
problem_count = resp['hits']['total']
if problem_count > problem_threshold:
msg = "[ERROR] {} => {} 内有超过 {} 个异常(ERROR 日志)数据.阀值为 {}".format(tomcatid, unit, problem_count, problem_threshold)
wx.send_msg(msg, wx_token)
# else:
# msg = "{} => {} 分钟内数据正常. 阀值为 {}".format(path, unit, probleam_threshold)
# wx.send_msg(msg, wx_token)
@catch_exceptions(cancel_on_failure=True)
def tomcat_report(unit):
wx_token = wx.get_token()
for tomcatid in tomcat_ids:
resp = es.search(index="filebeat-*", body={
"from": 0,
"size": 0,
"query": {
"bool": {
"must": [
{
"query_string": {
"query": "fields.level:(\"" + tomcatid + "\")",
"analyze_wildcard": True,
"default_field": "*"
}
},
{
"range": {
"@timestamp": {
"gte": "now-" + unit
}
}
}
]
}
}
})
# 有问题的数量
problem_count = resp['hits']['total']
wx.send_msg("[INFO] {} 最近 {} 内, 一共有 {} 个异常数据".format(tomcatid, unit, problem_count), wx_token)
# 每分钟检测一次, 最近一分钟内 如果 > 50 个异常, 则报警
schedule.every(1).minutes.do(check, unit="1m", problem_threshold=50)
schedule.every(1).hours.do(check, unit="1h", problem_threshold=500)
schedule.every(6).hours.do(check, unit="6h", problem_threshold=1000)
schedule.every().day.at("00:00").do(report, unit="24h")
schedule.every().day.at("12:00").do(report, unit="12h")
# tomcat_ids
schedule.every(1).minutes.do(tomcat_check, unit="1m", problem_threshold=50)
schedule.every(1).hours.do(tomcat_check, unit="1h", problem_threshold=500)
schedule.every(6).hours.do(tomcat_check, unit="6h", problem_threshold=1000)
schedule.every().day.at("00:00").do(tomcat_report, unit="24h")
schedule.every().day.at("12:00").do(tomcat_report, unit="12h")
while True:
schedule.run_pending()
time.sleep(1)
请根据相应的 ES 语法及实际情况, 来进行修改查询语句. 例如, 我上面的 fields.level
这些, 是在 filebeat 里的 fields
下面添加了 level
字段来区分不同类型的日志文件的.
setup.py 依赖配置
from setuptools import setup
setup(
name='wx-es-robot',
version='1.0',
packages=['robot'],
url='',
license='',
author='emacsist',
author_email='emacsist@qq.com',
description='',
install_requires=['elasticsearch', 'requests', 'schedule']
)
有其他问题, 可以 email 我.