1、脚本实现的功能
通过指定选项,从Elasticsearch中获取对应Index的数据,并根据事先设定的好的模式,输出对应模式的数据结果。
2、脚本实现的原理
对Elasticsearch的数据查询,有很多种方式,而且结合聚合或者其他条件,可以组合非常复杂的查询表达式,查询非常丰富和多样化的结果,而这种实现,再怎么说kibana都需要在界面进行各种条件设定和配置,而如果仅仅是通过shell脚本来传递参数,是很难达到这种结果的。
所以,只能把事先需要查询的模式或者已知的查询表达式写到shell脚本中,然后通过选项来查询和调用对应的查询模式。
而这个脚本就是将需要查询的组合表达式封装到一个个函数里面,通过case调用,如果后续需要追加的话,添加查询函数既可,不用改变脚本的结构。
3、脚本使用说明
-i:索引名称,以通配符进行匹配
-t:查询时间,开始时间和结束时间,格式如:"2018-09-12_10:43:27 2018-09-14_10:53:17"
-k:keyword,关键字名称
-f:filed,字段名称
-q:查询模式,可以通过这个参数指定查询模式,目前已经添加了两种查询模式(simple_query,aggs_avg),后续有需要追加的情况,只要增加对应的函数既可
4、脚本使用实例
关键字精确查询:如果有多个字段,以“,”号分割
#./elk.sh -i ef -t "2018-09-12_10:43:27 2018-09-14_10:53:17" -q simple_query -k level=notice #./elk.sh -i ef -t "2018-09-12_10:43:27 2018-09-14_10:53:17" -q simple_query -k level=notice,dstip=183.214.145.109
聚合平均值查询:
-f 后面添加的第一个参数为聚合的字段,从第二个参数开始,计算平局值,如下所示:
按照tranip字段进行分类,然后计算每个tranip的srcport的平均值,并且按照平均值从大到小排列
#./elk.sh -i ef -t "2018-09-12_10:43:27 2018-09-14_10:53:17" -q aggs_avg -f tranip,srcport #./elk.sh -i ef -t "2018-09-12_10:43:27 2018-09-14_10:53:17" -q aggs_avg -f tranip,srcport,dstport
5、脚本内容如下
#!/bin/bash ES_User= ES_Passord= ES_ADDR=http://ES_IP:9200 Time=`date "+%F_%T"` Log_Dir=/tmp/elk_search.log while getopts "i:t:k:q:f:" opt; do case $opt in i) Indexes=$OPTARG if [ -z $ES_User ];then curl -s -XGET ${ES_ADDR}/_cat/indices |gawk '{print $3}'|grep ^$Indexes > /dev/null 2>> ${Log_Dir} if [ $? -ne 0 ];then echo "${Time} $Indexes Index is not exist!" >> ${Log_Dir} exit 1; fi else curl -s -XGET -u ${ES_User}:${ES_Passord} ${ES_ADDR}/_cat/indices |gawk '{print $3}'|grep ^$Indexes > /dev/null 2 >> ${Log_Dir} if [ $? -ne 0 ];then echo "${Time} $Indexes Index is not exist!" >> ${Log_Dir} exit 1; fi fi ;; t) Time_Range=$OPTARG Time_Rang_Begin=`echo ${Time_Range}| sed 's/_/ /g'|gawk '{print $1" "$2}'` Time_Rang_End=`echo ${Time_Range}| sed 's/_/ /g'|gawk '{print $3" "$4}'` Time_Rang_End_Format=`date +%s -d "${Time_Rang_End}"` Time_Rang_Begin_Format=`date +%s -d "${Time_Rang_Begin}"` if echo ${Time_Range}|grep -Eq "[0-9]{4}-[0-9]{2}-[0-9]{2}_[0-9]{2}:[0-9]{2}:[0-9]{2} [0-9]{4}-[0-9]{2}-[0-9]{2}_[0-9]{2}:[0-9]{2}:[0-9]{2}" && date -d "${Time_Rang_Begin}" +"%F %T" > /dev/null 2>&1 && date -d "${Time_Rang_End}" +"%F %T" > /dev/null 2>&1;then Time_Rang_Diff=$[${Time_Rang_End_Format} - ${Time_Rang_Begin_Format}] if [ ${Time_Rang_Diff} -lt 0 ];then echo "${Time} The start time must be before the end time!" >> ${Log_Dir} exit 1; fi else echo "${Time} Time format is not OK!" >> ${Log_Dir} exit 1; fi ;; k) Query_Keyword=$OPTARG ;; f) Query_Fileds=$OPTARG ;; q) Query_Style=$OPTARG ;; \?) echo "Invalid option: $OPTARG" >> ${Log_Dir} ;; esac done simple_query(){ Num_Fields=`echo ${Query_Keyword} |gawk -F"," '{print NF}'` for ((i=1;i<=${Num_Fields};i++)); do Field[$i]=`echo ${Query_Keyword} |gawk -v i=$i -F"," '{print $i}'|gawk -F"=" '{print $1}'` Keyword[$i]=`echo ${Query_Keyword} |gawk -v i=$i -F"," '{print $i}'|gawk -F"=" '{print $2}'` Query_Template[$i]=`echo ,{\"match_phrase\": {\"${Field[$i]}\": {\"query\": \"${Keyword[$i]}\"}}}` Query_Template_All="${Query_Template_All}${Query_Template[$i]}" done Query_Main="{\"size\": 10000,\"query\":{\"bool\":{\"must\":[{\"match_all\":{}}${Query_Template_All},{\"range\":{\"@timestamp\":{\"gte\":${Time_Rang_Begin_Format},\"lte\":${Time_Rang_End_Format},\"format\":\"epoch_second\"}}}]}}}" curl -H "Content-Type: application/json" -XGET ${ES_ADDR}/${Indexes}*/_search -d"${Query_Main}" -s | python -m json.tool } aggs_avg(){ Num_Fields=`echo ${Query_Fileds} |gawk -F"," '{print NF}'` for ((i=1;i<=${Num_Fields};i++)); do Field[$i]=`echo ${Query_Fileds} |gawk -v i=$i -F"," '{print $i}'` #echo ${Aggs_Terms} if [ $i -ge 2 ];then Aggs_Avg[$i]=`echo ,\"$[${i}]\":{\"avg\":{\"field\":\"${Field[$i]}\"}}` Aggs_Avg_All="${Aggs_Avg_All}${Aggs_Avg[$i]}" else Aggs_Terms=",\"aggs\":{\"${i}\":{\"terms\":{\"field\":\"${Field[$i]}.keyword\",\"size\":100000}}}" fi done if [ ${Num_Fields} -eq 1 ];then Query_Main="{\"size\":10000,\"_source\":{\"excludes\":[]}${Aggs_Terms},\"stored_fields\":[\"*\"],\"script_fields\":{},\"docvalue_fields\":[\"@timestamp\",\"date\"],\"query\":{\"bool\":{\"must\":[{\"match_all\":{}},{\"range\":{\"@timestamp\":{\"gte\":${Time_Rang_Begin_Format},\"lte\":${Time_Rang_End_Format},\"format\":\"epoch_second\"}}}],\"filter\":[],\"should\":[],\"must_not\":[]}}}" curl -H "Content-Type: application/json" -XGET ${ES_ADDR}/${Indexes}*/_search -d"${Query_Main}" -s | python -m json.tool else Aggs_Terms="${Aggs_Terms%\}\}\}},\"order\":{\"2\":\"desc\"}}" Aggs_Avg_All=",\"aggs\":{${Aggs_Avg_All#,}}" Query_Main="{\"size\":0,\"_source\":{\"excludes\":[]}${Aggs_Terms}${Aggs_Avg_All}}},\"stored_fields\":[\"*\"],\"script_fields\":{},\"docvalue_fields\":[\"@timestamp\",\"date\"],\"query\":{\"bool\":{\"must\":[{\"match_all\":{}},{\"range\":{\"@timestamp\":{\"gte\":${Time_Rang_Begin_Format},\"lte\":${Time_Rang_End_Format},\"format\":\"epoch_second\"}}}],\"filter\":[],\"should\":[],\"must_not\":[]}}}" curl -H "Content-Type: application/json" -XGET ${ES_ADDR}/${Indexes}*/_search -d"${Query_Main}" -s | python -m json.tool fi } case $Query_Style in simple_query) simple_query ;; aggs_avg) aggs_avg ;; *) echo "${Time} There is no appropriate query type!" >> ${Log_Dir} exit 1 ;; esac
6、脚本Tips
getopts是SHELL脚本内置的功能,通过它可以给脚本设定选项然后传递参数,传递的参数通过变量$OPTARG赋值然后调用,选项后面如果有“:”号,则表示此选项必须带参数,用法如下:
while "i:t:k:q:f:" opt; do case $opt in i) Indexes=$OPTARG ;; t) Time_Range=$OPTARG ;; k) Query_Keyword=$OPTARG ;; f) Query_Fileds=$OPTARG ;; q) Query_Style=$OPTARG ;; \?) echo "Invalid option: $OPTARG" >> ${Log_Dir} ;; esac done