ES

依赖: >=JDK8

下载完直接解压即可.

开启外网IP访问

vim config/elasticsearch.yml

修改 network.host 为 
network.host: 0.0.0.0

一般不直接使用外网配置, 可以使用 nginx 代理, 然后在Nginx 加个简单的认证配置即可. 建议直接写明局域网的IP地址, 然后通过 nginx 代理即可.

访问地址: IP:9200

nginx 代理 + 认证

nginx 配置

        location / {
		auth_basic "欢迎来到监控中心 ^_^";
                auth_basic_user_file /home/company/nginx/nginx-1.12.2/conf/htpasswd;
		proxy_pass http://10.161.29.39:5601;
                proxy_redirect off;
                proxy_set_header Host $host;
                proxy_set_header X-Real-IP $remote_addr;
                proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
                client_max_body_size 200m;
                client_body_buffer_size 512k;
                proxy_connect_timeout 90;
                proxy_send_timeout 90;
                proxy_read_timeout 90;
                proxy_buffer_size 256k;
                proxy_buffers 4 256k;
                proxy_busy_buffers_size 512k;
                proxy_temp_file_write_size 512k;
	}
生成密码:
openssl passwd -crypt helloworld
Warning: truncating password to 8 characters

sOX8YFAkZbi7M


yourusername:sOX8YFAkZbi7M

reload 完 nginx 后, 就可以用 用户名 yourusername, 密码为 helloworld 来登录了

启动报错

ERROR: [2] bootstrap checks failed
[1]: max file descriptors [65535] for elasticsearch process is too low, increase to at least [65536]
[2]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]

解决:

sudo vim /etc/security/limits.conf
修改 65535 -> 65536 (或更大)
sudo vim /etc/sysctl.conf
添加或修改
vm.max_map_count=262144

然后查看是否生效:
sudo sysctl -p

然后退出 ssh 再重新登录

Kibana

下载 直接解压即可使用.

主要是配置好 ES 的地址就好. (建议与ES一样, 绑定本机局域网地址, 然后通过 nginx 代理访问)

配置

配置


要监听的地址
server.host: "10.161.29.39"

要修改为指向ES的主机
elasticsearch.url: "http://10.161.29.39:9200"

Filebeat

下载

download

https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-installation.html

修改配置文件

为不同的类型日志, 加上不同的标识

添加个自定义的字段:

- type: log
  enabled: true
  paths:
    - /home/company/nginx/nginx-1.9.3-dsp/logs/*.log
  exclude_lines: [' 200 ']
  #include_lines: ['^ERR', '^WARN']
  #exclude_files: ['.gz$']
  fields:
    modules: ad-monitor

其中 fields 下面的是根据自己需求, 来为该 log 的数据添加额外的字段(比如, 用来标识这某类日志, 方便在 es 里根据该字段来进行搜索)

include 表示日志行(按行为单位)包含该字符串的数据. exclude 表示日志行不能包含该字符串的数据.

它们都支持正则.比如要查询 [ERROR] 这种 java logback 日志的数据. 可以这样子:

include: ['\[ERROR\]']

注意, 是用单引号来括住正则表达式的.

配置不同的索引名

output.elasticsearch:
  hosts: ["http://localhost:9200"]
  index: "filebeat-%{[beat.version]}-%{+yyyy.MM.dd}"

启动

sudo ./filebeat -e -c filebeat.yml

整合

启动完以上三个系统, 就应该可以从 Kibana 里(直接访问Kibana的服务器地址URL) 看到日志被不断地收集进来了.

创建仪表盘

步骤如下

  1. 先在 Discover 里创建一个搜索语句, 然后保存该搜索语句(类似 DB 的创建视图)
  2. 然后在 Visualize 里, 创建一个视图, 然后视图里选择 步骤1 所保存的搜索语句
  3. 再次在 Visualize 里, 保存好该视图的配置(X, Y轴等的设置信息)
  4. 最后, 在 Dashboard 里, 创建一个仪表盘, 然后编辑 -> 添加视图, 选择 步骤2,3 所保存好的视图名即可.

报警

由于官方的插件 x pack 需要购买许可证. 所以…你懂的.

现在的开源项目的方案, 又没跟上最新版本的 ELK , 所以想到的一个解决办法就是, 使用 python + ES 查询 + 条数 来进行预警(使用企业微信). 代码如下, 这个可根据自己的实际需求来修改.

微信发送消息代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import requests


corpid = "你的corpid"
corpsecret = "你的企业应用的corpsecret"
agentid = 你的部门ID

token_url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={}&corpsecret={}"
send_msg_url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}"

formated_token_url = token_url.format(corpid, corpsecret)


def get_token():
    r = requests.get(formated_token_url)
    return r.json()['access_token']


def send_msg(msg, token):
    formated_send_msg_url = send_msg_url.format(token)
    resp = requests.post(formated_send_msg_url, json={
        "touser": "@all",
        "toparty": "",
        "totag": "",
        "msgtype": "text",
        "agentid": agentid,
        "text": {
            "content": msg
        },
        "safe": 0
    })
    print("resp")
    print(resp.json())

实际监控代码

# -*- coding: utf-8 -*-

from elasticsearch import Elasticsearch

import wx
import schedule
import time
import functools

es = Elasticsearch(['http://你的ES地址:9200'])

# 阀值
ProblemThreshold = 3

web_nginx_prefix = "/home/company/nginx/nginx-1.9.3-dsp/logs/"
ad_nginx_log_paths = ('ad-monitor.log', 'ad-click.log', 'ad-win-request.log')
dsp_nginx_log_paths = ('biz-monitor.log', 'biz-click.log', 'dsp-win-request.log')

nginx_log_paths = ad_nginx_log_paths + dsp_nginx_log_paths

# schedules 或 web tomcat 这里是 filebeat 的自定义字段 "fields.level = ad-sch 或 fields.level = dsp-sch"
ad_sch_id = "ad-sch"
dsp_sch_id = "dsp-sch"
ad_web_id = "ad-web"
dsp_web_id = "dsp-web"

tomcat_ids = [ad_sch_id, ad_web_id, dsp_sch_id, dsp_web_id]


def catch_exceptions(cancel_on_failure=False):
    def actual_decorator(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            try:
                print('LOG: Running job "%s"' % job_func.__name__)
                result = job_func(*args, **kwargs)
                print('LOG: Job "%s" completed' % job_func.__name__)
                return result
            except:
                import traceback
                print(traceback.format_exc())
                if cancel_on_failure:
                    return schedule.CancelJob

        return wrapper

    return actual_decorator


# unit 参考 ES 的语法, 单位为分钟. 表示最近 unit 分钟内, probleam_threshold 表示有问题的数量
# https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#date-math
@catch_exceptions(cancel_on_failure=True)
def check(unit="1m", problem_threshold=50):
    wx_token = wx.get_token()
    for path in nginx_log_paths:
        resp = es.search(index="filebeat-*", body={
            "from": 0,
            "size": 0,
            "query": {
                "bool": {
                    "must": [
                        {
                            "query_string": {
                                "query": "message:(-200) AND source:(\"" + web_nginx_prefix + path + "\")",
                                "analyze_wildcard": True,
                                "default_field": "*"
                            }
                        },
                        {
                            "range": {
                                "@timestamp": {
                                    "gte": "now-" + unit
                                }
                            }
                        }
                    ]
                }
            }
        })

        # 有问题的数量
        problem_count = resp['hits']['total']
        if problem_count > problem_threshold:
            msg = "[ERROR] {} => {} 内有超过 {} 个异常(非200)数据.阀值为 {}".format(path, unit, problem_count, problem_threshold)
            wx.send_msg(msg, wx_token)
        # else:
        #     msg = "{} => {} 分钟内数据正常. 阀值为 {}".format(path, unit, probleam_threshold)
        #     wx.send_msg(msg, wx_token)


@catch_exceptions(cancel_on_failure=True)
def report(unit):
    wx_token = wx.get_token()
    for path in nginx_log_paths:
        resp = es.search(index="filebeat-*", body={
            "from": 0,
            "size": 0,
            "query": {
                "bool": {
                    "must": [
                        {
                            "query_string": {
                                "query": "message:(-200) AND source:(\"" + web_nginx_prefix + path + "\")",
                                "analyze_wildcard": True,
                                "default_field": "*"
                            }
                        },
                        {
                            "range": {
                                "@timestamp": {
                                    "gte": "now-" + unit
                                }
                            }
                        }
                    ]
                }
            }
        })

        # 有问题的数量
        problem_count = resp['hits']['total']
        wx.send_msg("[INFO] {} 最近 {} 内, 一共有 {} 个异常数据".format(path, unit, problem_count), wx_token)


@catch_exceptions(cancel_on_failure=True)
def tomcat_check(unit="1m", problem_threshold=50):
    wx_token = wx.get_token()
    for tomcatid in tomcat_ids:
        resp = es.search(index="filebeat-*", body={
            "from": 0,
            "size": 0,
            "query": {
                "bool": {
                    "must": [
                        {
                            "query_string": {
                                "query": "fields.level:(\"" + tomcatid + "\")",
                                "analyze_wildcard": True,
                                "default_field": "*"
                            }
                        },
                        {
                            "range": {
                                "@timestamp": {
                                    "gte": "now-" + unit
                                }
                            }
                        }
                    ]
                }
            }
        })

        # 有问题的数量
        problem_count = resp['hits']['total']
        if problem_count > problem_threshold:
            msg = "[ERROR] {} => {} 内有超过 {} 个异常(ERROR 日志)数据.阀值为 {}".format(tomcatid, unit, problem_count, problem_threshold)
            wx.send_msg(msg, wx_token)
        # else:
        #     msg = "{} => {} 分钟内数据正常. 阀值为 {}".format(path, unit, probleam_threshold)
        #     wx.send_msg(msg, wx_token)


@catch_exceptions(cancel_on_failure=True)
def tomcat_report(unit):
    wx_token = wx.get_token()
    for tomcatid in tomcat_ids:
        resp = es.search(index="filebeat-*", body={
            "from": 0,
            "size": 0,
            "query": {
                "bool": {
                    "must": [
                        {
                            "query_string": {
                                "query": "fields.level:(\"" + tomcatid + "\")",
                                "analyze_wildcard": True,
                                "default_field": "*"
                            }
                        },
                        {
                            "range": {
                                "@timestamp": {
                                    "gte": "now-" + unit
                                }
                            }
                        }
                    ]
                }
            }
        })

        # 有问题的数量
        problem_count = resp['hits']['total']
        wx.send_msg("[INFO] {} 最近 {} 内, 一共有 {} 个异常数据".format(tomcatid, unit, problem_count), wx_token)


# 每分钟检测一次, 最近一分钟内 如果 > 50 个异常, 则报警
schedule.every(1).minutes.do(check, unit="1m", problem_threshold=50)
schedule.every(1).hours.do(check, unit="1h", problem_threshold=500)
schedule.every(6).hours.do(check, unit="6h", problem_threshold=1000)

schedule.every().day.at("00:00").do(report, unit="24h")
schedule.every().day.at("12:00").do(report, unit="12h")

# tomcat_ids
schedule.every(1).minutes.do(tomcat_check, unit="1m", problem_threshold=50)
schedule.every(1).hours.do(tomcat_check, unit="1h", problem_threshold=500)
schedule.every(6).hours.do(tomcat_check, unit="6h", problem_threshold=1000)

schedule.every().day.at("00:00").do(tomcat_report, unit="24h")
schedule.every().day.at("12:00").do(tomcat_report, unit="12h")

while True:
    schedule.run_pending()
    time.sleep(1)

请根据相应的 ES 语法及实际情况, 来进行修改查询语句. 例如, 我上面的 fields.level 这些, 是在 filebeat 里的 fields 下面添加了 level 字段来区分不同类型的日志文件的.

setup.py 依赖配置

from setuptools import setup

setup(
    name='wx-es-robot',
    version='1.0',
    packages=['robot'],
    url='',
    license='',
    author='emacsist',
    author_email='emacsist@qq.com',
    description='',
    install_requires=['elasticsearch', 'requests', 'schedule']
)

有其他问题, 可以 email 我.