陋室铭


  • 首页

  • 分类

  • 归档

  • 标签
陋室铭

Centos7安装TICK套装进行系统监控

发表于 2018-07-06 | 分类于 TICK | 阅读次数

添加 TICK 套装 Repository

创建文件/etc/yum.repos.d/influxdata.repo

1
sudo vi /etc/yum.repos.d/influxdata.repo

编辑内容

1
2
3
4
5
6
[influxdb]
name = InfluxDB Repository - RHEL \$releasever
baseurl = https://repos.influxdata.com/rhel/\$releasever/\$basearch/stable
enabled = 1
gpgcheck = 1
gpgkey = https://repos.influxdata.com/influxdb.key

安装InfulxDB

1
2
3
4
5
sudo yum install influxdb

或

docker pull influxdb

启动

1
sudo systemctl start influxdb

查看是否正常运行

1
2
3
4
5
6
7
8
9
10
systemctl status influxdb

● influxdb.service - InfluxDB is an open-source, distributed, time series database
Loaded: loaded (/usr/lib/systemd/system/influxdb.service; enabled; vendor preset: disabled)
Active: active (running) since 五 2018-07-06 14:08:56 CST; 17s ago
Docs: https://docs.influxdata.com/influxdb/
Main PID: 1879 (influxd)
Memory: 6.2M
CGroup: /system.slice/influxdb.service
└─1879 /usr/bin/influxd -config /etc/influxdb/influxdb.conf

启动influxdb控制台

1
influx

创建一个admin用户,作为管理用户

1
create user "admin" with password 'admin' with all privileges

验证是否已创建用户:

1
2
3
4
5
show users

user admin
---- -----
admin true

退出控制台

1
exit

修改/etc/influxdb/influxdb.conf,这是influxdb的配置文件

1
2
3
4
5
6
7
[http]
# Determines whether HTTP endpoint is enabled.
# enabled = true
# The bind address used by the HTTP service.
# bind-address = ":8086"
# Determines whether HTTP authentication is enabled.
auth-enabled = true

保存文件,重启influxdb服务

1
sudo systemctl restart influxdb

infulxdb已配置,安装Telegraf

安装和配置Telegraf

1
2
3
4
5
sudo yum install telegraf

或

docker pull telegraf

Telegraf使用插件来输入和输出数据,默认输出插件用于influxdb。由于我们启用了influxdb的身份认证,
我们修改Telegraf的配置文件以指定我们配置的用户和密码。

1
sudo vi /etc/telegraf/telegraf.conf

找到[outputs.influxdb]部分提供用户名和密码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[[outputs.influxdb]]
## The full HTTP or UDP endpoint URL for your InfluxDB instance.
## Multiple urls can be specified as part of the same cluster,
## this means that only ONE of the urls will be written to each interval.
# urls = ["udp://localhost:8089"] # UDP endpoint example
urls = ["http://localhost:8086"] # required
## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required

...

## Write timeout (for the InfluxDB client), formatted as a string.
## If not provided, will default to 5s. 0s means no timeout (not recommended).
timeout = "5s"
username = "sammy"
password = "sammy_admin"
## Set the user agent for HTTP POSTs (can be useful for log differentiation)
# user_agent = "telegraf"
## Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes)
# udp_payload = 512

保存文件,退出编辑器,然后启动Telegraf:

1
sudo systemctl start telegraf

然后检查服务是否正常运行

1
2
3
4
5
6
7
8
9
10
systemctl status telegraf

● telegraf.service - The plugin-driven server agent for reporting metrics into InfluxDB
Loaded: loaded (/usr/lib/systemd/system/telegraf.service; enabled; vendor preset: disabled)
Active: active (running) since 五 2018-07-06 14:33:23 CST; 18s ago
Docs: https://github.com/influxdata/telegraf
Main PID: 2252 (telegraf)
Memory: 10.8M
CGroup: /system.slice/telegraf.service
└─2252 /usr/bin/telegraf -config /etc/telegraf/telegraf.conf -config-directory /et...

Telegraf现在正在收集数据并将其写入influxdb。登录influxdb的控制台

1
influx -username 'admin' -password 'admin'

登录后,执行以下命令查看可用的数据库

1
show databases

查看输出的数据库:

1
2
3
4
5
name: databases
name
----
_internal
telegraf

查看telegraf数据库

1
user telegraf

显示Telegraf通过执行此命令收集的各种测量

1
2
3
4
5
6
7
8
9
10
11
12
13
show measurements

name: measurements
name
----
cpu
disk
diskio
kernel
mem
processes
swap
system

在终端窗口中运行telegraf -usage plugin-name,可以查看每个输入插件的使用说明。

安装Chronograf

Chronograf是一个数据可视化软件,包含控制台,报表。同时集成了Kapacitor的报警功能

1
2
3
4
sudo yum install chronograf

或
docker pull quay.io/influxdb/chronograf:1.5.0.1

启动

1
sudo systemctl start chronograf

关闭防火墙或者设置防火墙

1
2
3
4
5
6
firewall-cmd --state //查看防火墙状态

systemctl status firewalld.service //查看防火墙状态

systemctl stop firewalld.service //关闭防火墙
systemctl disable firewalld.service //禁止防火墙开机启动

访问 http://ip:8888 Chronograf界面,输入influxdb数据库的用户名和密码,然后单击连接数据源。
单击host列表,可以看到主机的一系列图表。

安装Kapacitor

Kapacitor 是数据处理引擎,主要用来作为报警使用

1
2
3
4
5
6
7
8
sudo yum install kapacitor  //安装失败

或
wget https://dl.influxdata.com/kapacitor/releases/kapacitor-1.5.0.x86_64.rpm
sudo yum localinstall kapacitor-1.5.0.x86_64.rpm

或docker 安装
docker pull kapacitor

编辑文件 /etc/kapacitor/kapacitor.conf,定位到[[influxdb]]

1
2
3
4
5
6
7
8
9
10
11
12
13
# Multiple InfluxDB configurations can be defined.
# Exactly one must be marked as the default.
# Each one will be given a name and can be referenced in batch queries and InfluxDBOut nodes.
[[influxdb]]
# Connect to an InfluxDB cluster
# Kapacitor can subscribe, query and write to this cluster.
# Using InfluxDB is not required and can be disabled.
enabled = true
default = true
name = "localhost"
urls = ["http://localhost:8086"]
username = "admin"
password = "admin"

启动

1
2
sudo systemctl daemon-reload
sudo systemctl start kapacitor

验证Kapacitor是否在运行,使用以下命令:

1
kapacitor list tasks

如果Kapacitor启动并运行,您将看到一个空的任务列表,如下所示:

1
ID                            Type      Status    Executing Databases and Retention Policies

安装和配置Kapacitor后,我们安装TICK的用户界面组件,以便我们可以看到一些结果并配置一些警报。

配置警报

设置一个简单的警报,寻找高CPU使用率。 将鼠标悬停在左侧导航菜单上,找到ALERTING部分,然后点击Kapacitor Rules 。 然后单击创建新规则 。 在第一部分中,单击telegraf.autogen选择时间序列。 然后从显示的列表中选择系统 。 然后选择load1 。您将在下面的部分中立即看到相应的图表。 在图表上方,找到Load1大于的值为“ 发送警报”的字段,并为该值输入1.0 。 然后将以下文本粘贴到“ 警报消息”字段中以配置警报消息的文本:

1
{{ .ID }} is {{ .Level }} value: {{ index .Fields "value" }}

可以将鼠标悬停在“ 模板”部分中的条目上,以获取每个字段的说明。 然后从发送此警报到下拉列表中选择Smtp选项,并在相关字段中输入您的电子邮件地址。 默认情况下,您将以JSON格式接收邮件,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//示例消息
{
"Name":"system",
"TaskName":"chronograf-v1-50c67090-d74d-42ba-a47e-45ba7268619f",
"Group":"nil",
"Tags":{
"host":"centos-tick"
},
"ID":"TEST:nil",
"Fields":{
"value":1.25
},
"Level":"CRITICAL",
"Time":"2017-03-08T12:09:30Z",
"Message":"TEST:nil is CRITICAL value: 1.25"
}

您可以为邮件警报设置更多可供人读取的邮件。为此,请在将电子邮件正文文本放在此处占位符的文本框中输入消息。 您可以通过单击页面左上角的名称并输入新名称来重命名此规则。 最后,单击右上角的Save Rule以完成配置此规则。 要测试此新创建的警报,请使用dd命令从/dev/zero读取数据并将其发送到/dev/null ,以创建CPU尖峰:

1
dd if=/dev/zero of=/dev/null

让命令运行几分钟,这应该足以创建一个尖峰。您可以随时通过按CTRL+C停止命令。 过一会儿,您将收到一封电子邮件。此外,您还可以通过单击Chronograf用户界面左侧导航菜单中的警报历史记录来查看所有警报。 注意 :确认您可以接收快讯后,请务必停止使用CTRL+C启动的dd命令

陋室铭

Byteman使用教程

发表于 2018-07-06 | 分类于 Byteman | 阅读次数

简介

Byteman由JBoss出品。Byteman的代码插入能力相比BTrace而言更强,似乎可以在代码中任意的位置插入我们的跟踪代码(当然,你可能需要对Java代码生成、字节码技术有一定的了解),以及访问当前方法中变量的能力(包括方法参数、局部变量、甚至于调用其它函数的参数值、返回值等),而BTrace在这方面的能力要弱很多。

安装Byteman

官方下载
配置BYTEMAN_HOME

验证,

1
2
3
4
5
6
7
8
9
10
11
./bminstall.sh

usage: bminstall [-p port] [-h host] [-b] [-s] [-m] [-Dname[=value]]* pid
pid is the process id of the target JVM
-h host selects the host name or address the agent listener binds to
-p port selects the port the agent listener binds to
-b adds the byteman jar to the bootstrap classpath
-s sets an access-all-areas security policy for the Byteman agent code
-m activates the byteman JBoss modules plugin
-Dname=value can be used to set system properties whose name starts with “org.jboss.byteman.”
expects to find a byteman agent jar and byteman JBoss modules plugin jar (if -m is indicated) in BYTEMAN_HOME

使用示例1(读取局部变量)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStreamReader;

/**
* @author yangchj
* @email yangchj@icefire.me
* @date 2018/7/6
*/
public class BytemanDemo {

public static void main(String[] args) {
new BytemanDemo().start();
}

private void start(){
new Thread(()->{
DataInputStream in=new DataInputStream(System.in);
BufferedReader buf=new BufferedReader(new InputStreamReader(in));
try{
String next=buf.readLine();
while (next!=null&&!next.contains("end")){
consume(next);
next=buf.readLine();
}
}catch (IOException e){
e.printStackTrace();
}
}).start();
}

public void consume(String text){
final String arg=text;
Thread thread=new Thread(()-> System.out.println("program confirm " + arg));
thread.start();
try {
thread.join();
}catch (InterruptedException e){
e.printStackTrace();
}
}
}

启动程序 BytemanDemo

加载byteman到JVM中,并将attach到需要监听的进程上,指定byteman监控程序监听55000端口

1
2
3
4
5
6
jps

18592 Jps
20011 BytemanDemo

./bminstall.sh -b -Dorg.jboss.byteman.transform.all -Dorg.jboss.byteman.verbose -p 55000 20011

回到被监控的程序的控制台看到如下输出:

1
2
3
Setting org.jboss.byteman.transform.all=
Setting org.jboss.byteman.verbose=
TransformListener() : accepting requests on localhost:55000

从输出信息中可以看到,byteman监听55000端口,并在该端口接收请求,编写btm脚本,showLocalVar.btm,如下:

1
2
3
4
5
6
7
RULE trace line local var
CLASS BytemanDemo
METHOD consume(String)
AFTER WRITE $arg
IF TRUE
DO traceln("*** transfer value is : " + $arg + " ***")
ENDRULE

安装脚本吧,运行bmsubmit提交脚本

1
./bmsubmit -p 55000 -l showLocalVar.btm

再回控制台,输入测试语句“hhhhhhhhhhh”,查看输出

卸载脚本

1
./bmsubmit -p 55000 -u showLocalVar.btm

卸载完成,再回到被监控程序的控制台,输入测试语句,查看输出

陋室铭

elastalert监控日志报警

发表于 2018-07-03 | 分类于 elk | 阅读次数

简述

Elastalert是Yelp公司用python2写的一个报警框架

安装Elastalert(python2.6或2.7)

下载最新elastalert并安装

1
2
3
4
git clone https://github.com/Yelp/elastalert.git
sudo python setup.y install

sudo pip install -r requirement.txt

安装完成后,会在/usr/local/bin/ 下生成elastalert命令

1
2
3
4
5
6
7
//查看生成的elastalert命令
ls -l /usr/local/bin/elastalert*

-rwxr-xr-x 1 root root 396 2月 14 10:03 /usr/local/bin/elastalert
-rwxr-xr-x 1 root root 422 2月 14 10:03 /usr/local/bin/elastalert-create-index
-rwxr-xr-x 1 root root 430 2月 14 10:03 /usr/local/bin/elastalert-rule-from-kibana
-rwxr-xr-x 1 root root 416 2月 14 10:03 /usr/local/bin/elastalert-test-rule

设置索引

elastalert-create-index 命令创建elasticsearch 创建索引,默认情况下创建的索引是elastalert_status

1
./elastalert-create-index

设置配置文件和规则Rule

1
2
3
cp elastalert/config.yaml.example elastalert/config.yaml

vi elastalert/config.yaml

config.yaml中的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# This is the folder that contains the rule yaml files
# Any .yaml file will be loaded as a rule
rules_folder: example_rules

# How often ElastAlert will query Elasticsearch
# The unit can be anything from weeks to seconds
run_every:
minutes: 1

# ElastAlert will buffer results from the most recent
# period of time, in case some log sources are not in real time
buffer_time:
minutes: 15

# The Elasticsearch hostname for metadata writeback
# Note that every rule can have its own Elasticsearch host
es_host: localhost

# The Elasticsearch port
es_port: 9200

# The AWS region to use. Set this when using AWS-managed elasticsearch
#aws_region: us-east-1

# The AWS profile to use. Use this if you are using an aws-cli profile.
# See http://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html
# for details
#profile: test

# Optional URL prefix for Elasticsearch
#es_url_prefix: elasticsearch

# Connect with TLS to Elasticsearch
#use_ssl: True

# Verify TLS certificates
#verify_certs: True

# GET request with body is the default option for Elasticsearch.
# If it fails for some reason, you can pass 'GET', 'POST' or 'source'.
# See http://elasticsearch-py.readthedocs.io/en/master/connection.html?highlight=send_get_body_as#transport
# for details
#es_send_get_body_as: GET

# Option basic-auth username and password for Elasticsearch
#es_username: someusername
#es_password: somepassword

# Use SSL authentication with client certificates client_cert must be
# a pem file containing both cert and key for client
#verify_certs: True
#ca_certs: /path/to/cacert.pem
#client_cert: /path/to/client_cert.pem
#client_key: /path/to/client_key.key

# The index on es_host which is used for metadata storage
# This can be a unmapped index, but it is recommended that you run
# elastalert-create-index to set a mapping
writeback_index: elastalert_status

# If an alert fails for some reason, ElastAlert will retry
# sending the alert until this time period has elapsed
alert_time_limit:
days: 2

创建smtp_auth_file.yaml

1
2
3
4
#发送邮件的邮箱
user: xxx@163.com
#不是邮箱密码,是设置的POP3密码
password: xxx

创建example_frequency.yaml

参见creating-a-rules

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# Alert when the rate of events exceeds a threshold

# (Optional)
# Elasticsearch host
es_host: localhost

# (Optional)
# Elasticsearch port
es_port: 9200

# (OptionaL) Connect with SSL to Elasticsearch
#use_ssl: True

# (Optional) basic-auth username and password for Elasticsearch
#es_username: someusername
#es_password: somepassword

# (Required)
# Rule name, must be unique
name: Example frequency rule

# (Required)
# Type of alert.
# the frequency rule type alerts when num_events events occur with timeframe time
type: frequency

# (Required)
# Index to search, wildcard supported
index: filebeat-*

# (Required, frequency specific)
# Alert when this many documents matching the query occur within a timeframe
num_events: 1

# (Required, frequency specific)
# num_events must occur within this amount of time to trigger an alert
timeframe:
minutes: 1

# (Required)
# A list of Elasticsearch filters used for find events
# These filters are joined with AND and nested in a filtered query
# For more info: http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/query-dsl.html
filter:
- term:
errorDesc: "201"



#SMTP协议的邮件服务器相关配置
#smtp.163.com是网易163邮箱的smtp服务器
#登陆163邮箱后,找到 【设置】>【POP3/SMTP/IMAP】>开启,然后设置【客户端授权密码】
smtp_host: smtp.qq.com
smtp_port: 587

#用户认证文件,需要user和password两个属性
#注意将${userName}替换成具体用户名
smtp_auth_file: /Users/yangchj/soft/develop/elk/elastalert/smtp_auth_file.yaml
#回复给那个邮箱
email_reply_to: xxx@qq.com
#从哪个邮箱发送
from_addr: xx@qq.com



# (Required)
# The alert is use when a match is found
alert:
- "email"

# (required, email specific)
# a list of email addresses to send alerts to
email:
- "xxx@126.com"

测试规则

1
elastalert-test-rule ~/elastalert/example_rules/example_frequency.yaml

运行

1
2
cd ~/elastalert
python -m elastalert.elastalert --verbose --rule example_frequency.yaml

Docker 安装elastalert

1
docker pull bitsensor/elastalert

运行

1
2
3
4
5
6
7
docker run -d -p 3030:3030 \
-v `pwd`/config/elastalert.yaml:/opt/elastalert/config.yaml \
-v `pwd`/config/config.json:/opt/elastalert-server/config/config.json \
-v `pwd`/rules:/opt/elastalert/rules \
-v `pwd`/rules_templates:/opt/elastalert/rule_templates \
--net="host" \
--name elastalert bitsensor/elastalert:latest

陋室铭

分布式实时日志分析解决方案之ELKStack

发表于 2018-06-29 | 分类于 ELK | 阅读次数

概述

ELK Stack是目前最流行的集中式日志解决方案,它主要是由Elasticsearch、Logstash、Kibana、Beats等组件组成,来共同完成实时日志的收集,存储,
展示等一站式的解决方案。

  1. Filebeat: Filebeat是一款轻量级,占用服务资源非常少的数据收集引擎。可以代替LogStash作为应用服务端的日志收集引擎,支持将收集的数据输出到Kafka,Redis等队列。

  2. Logstash: 数据收集引擎,相较于Filebeat比较重量级,但集成了大量的插件,支持丰富的数据源收集,对收集的数据可以过滤,分析,格式化日志。

  3. Elasticsearch:分布式数据搜索引擎,基于Apache Lucene实现,可集群,提供数据的集中式存储,分析,以及强大的数据搜索和聚合功能

  4. Kibana:数据的可视化平台,通过web平台可以实时的查看Elasticsearch中的相关数据,并提供了丰富的图表统计功能。

ELK常见部署架构

  1. Logstash 作为日志收集器
    这种架构是比较原始的部署架构,在各应用服务器端分别部署一个Logstash组件,作为日志收集器,然后将Logstash收集到的数据过滤、分析、格式化处理后发送至Elasticsearch存储,最后使用Kibana进行可视化展示,这种架构不足的是:Logstash比较耗服务器资源,所以会增加应用服务器端的负载压力。

  2. Filebeat作为日志收集器
    该架构与第一种架构唯一不同的是:应用端日志收集器换成了Filebeat,Filebeat轻量,占用服务器资源少,所以使用Filebeat作为应用服务器端的日志收集器,一般Filebeat会配合Logstash一起使用,这种部署方式也是目前最常用的架构。

  3. 引入缓存队列的部署架构
    第一种部署架构由于资源占用问题,现已很少使用,目前使用最多的是第二种部署架构,至于第三种部署架构个人觉得没有必要引入消息队列,除非有其他需求,因为在数据量较大的情况下,Filebeat 使用压力敏感协议向 Logstash 或 Elasticsearch 发送数据。如果 Logstash 正在繁忙地处理数据,它会告知 Filebeat 减慢读取速度。拥塞解决后,Filebeat 将恢复初始速度并继续发送数据。

组件的安装与使用

Elasticsearch
修改Elasticsearch的配置文件config/elasticsearch.yml

1
2
network.host=localhost
network.port=9200

运行

1
nohup ./elasticsearch >>nohup.out 2>$1 &

访问 localhost:9200

logstash
创建配置文件config/logstash-filebeat.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
input {
beats {
port => 5044
type => "service-log"
}
}
filter {
# multiline {
# pattern => "%{LOGLEVEL}\s*\]\[%{YEAR}%{MONTHNUM}%{MONTHDAY}\s+%{TIME}\]"
# negate => true
# what => "previous"
# }
if [fields][logType] == "service-error" {
grok {
# match => [ "message" , "(?<customer_time>%{YEAR}-%{MONTHNUM}-%{MONTHDAY}\s+%{TIME})"]
match => [
"message","%{TIMESTAMP_ISO8601:customer_time}%{GREEDYDATA:thread-id}%{LOGLEVEL:loglevel} (?<classname>[^\]]+)-%{GREEDYDATA:logDetail}-%{GREEDYDATA:msgbody}"
]
}
date {
match => ["customer_time", "yyyy-MM-dd HH:mm:ss.SSS"]
target => "@timestamp"
}

kv {
source => "msgbody"
field_split => "\|"
value_split => "="
}
}

}
output {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}

注:log4j2.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="OFF">
<Properties>
<Property name="LOG_HOME">./</Property>
<Property name="LOG_NAME">producer-eu-c</Property>
<property name="LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %l - %msg%n" />
</Properties>
<Appenders>
<!--很直白,Console指定了结果输出到控制台-->
<Console name="ConsolePrint" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} %l - %msg%n"/>
</Console>
<!--<File>输出结果到指定文件</File>-->
<!--<RollingFile>同样输出结果到指定文件,但是使用buffer,速度会快点</RollingFile>-->
<!--filePattern:表示当日志到达指定的大小或者时间,产生新日志时,旧日志的命名路径。-->
<!--PatternLayout:和log4j一样,指定输出日志的格式,append表示是否追加内容,值默认为true-->
<RollingFile name="RollingFileDebug" fileName="${LOG_HOME}/log/${LOG_NAME}-debug.log"
filePattern="${logFilePath}/log/$${date:yyyy-MM}/${LOG_NAME}-%d{yyyy-MM-dd}_%i.log.gz">
<PatternLayout pattern="${LOG_PATTERN}"/>
<!--注意,如果有多个ThresholdFilter,那么Filters标签是必须的-->
<Filters>
<!--首先需要过滤不符合的日志级别,把不需要的首先DENY掉,然后在ACCEPT需要的日志级别,次序不能颠倒-->
<!--INFO及以上级别拒绝输出-->
<ThresholdFilter level="INFO" onMatch="DENY" onMismatch="NEUTRAL"/>
<!--只输出DEBUG级别信息-->
<ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY"/>
</Filters>
<Policies>
<!--时间策略,每隔24小时产生新的日志文件-->
<TimeBasedTriggeringPolicy/>
<!--大小策略,每到30M时产生新的日志文件-->
<SizeBasedTriggeringPolicy size="30MB"/>
</Policies>
</RollingFile>
<RollingFile name="RollingFileInfo" fileName="${LOG_HOME}/log/${LOG_NAME}-info.log"
filePattern="${LOG_HOME}/log/$${date:yyyy-MM}/${LOG_NAME}-%d{yyyy-MM-dd}_%i.log.gz">
<Filters>
<!--onMatch:Action to take when the filter matches. The default value is NEUTRAL-->
<!--onMismatch: Action to take when the filter does not match. The default value is DENY-->
<!--级别在ERROR之上的都拒绝输出-->
<!--在组合过滤器中,接受使用NEUTRAL(中立),被第一个过滤器接受的日志信息,会继续用后面的过滤器进行过滤,只有符合所有过滤器条件的日志信息,才会被最终写入日志文件-->
<ThresholdFilter level="ERROR" onMatch="DENY" onMismatch="NEUTRAL"/>
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
</Filters>
<PatternLayout pattern="${LOG_PATTERN}"/>
<Policies>
<TimeBasedTriggeringPolicy/>
<SizeBasedTriggeringPolicy size="30MB"/>
</Policies>
</RollingFile>
<RollingFile name="RollingFileError" fileName="${LOG_HOME}/log/${LOG_NAME}-error.log"
filePattern="${LOG_HOME}/log/$${date:yyyy-MM}/${LOG_NAME}-%d{yyyy-MM-dd}_%i.log.gz">
<Filters>
<ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/>
</Filters>
<PatternLayout pattern="${LOG_PATTERN}"/>
<Policies>
<TimeBasedTriggeringPolicy/>
<SizeBasedTriggeringPolicy size="30MB"/>
</Policies>
</RollingFile>
</Appenders>
<Loggers>
<!--logger用于定义log的level以及所采用的appender,如果无需自定义,可以使用root解决,root标签是log的默认输出形式-->
<!-- 级别顺序(低到高):TRACE < DEBUG < INFO < WARN < ERROR < FATAL -->

<logger name="com.neusoft" level="INFO">
<Appender-ref ref="RollingFileInfo"/>
<Appender-ref ref="RollingFileError"/>
</logger>

<!--<logger name="org.springframework" level="INFO">-->
<!--<Appender-ref ref="RollingFileInfo"/>-->
<!--</logger>-->
<Root level="INFO">
<!-- 只要是级别比ERROR高的,包括ERROR就输出到控制台 -->
<!--appender-ref中的值必须是在前面定义的appenders-->
<Appender-ref ref="ConsolePrint"/>
<!--<Appender-ref ref="RollingFileDebug"/>-->
</Root>
</Loggers>
</Configuration>

运行

1
nohup ./bin/logstash -f config/logstash-filebeat.conf >>nohup.out 2>&1 &

访问 localhost:9600

Filebeat
filebeat.yml配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
filebeat.inputs:
- type: log
enabled: true
paths:
- /Users/yangchj/work/kfw/kfw-demo-02/log/test*-info.log
multiline.pattern: ^\[
multiline.negate: false
multiline.match: after
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false

setup.template.settings:
index.number_of_shards: 3
setup.kibana:

output.logstash:
hosts: ["localhost:5044"]

启动

1
2
3
nohup ./filebeat -e -c filebeat.yml >>nohup.out 2>&1 &

nohup ./filebeat -e -c filebeat.yml >/dev/null 2>&1 &

kibana

1
nohup ./bin/kibana  >>nohup.out 2>&1 &

访问 http://localhost:5601/

问题及解决方案

  1. 日志多行合并功能
    使用Filebeat的multiline多行合并插件来实现,multiline在Filebeat中的配置方式:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    filebeat.inputs:
    - type: log
    paths:
    - /var/log/system.log
    - /var/log/wifi.log
    multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
    multiline.negate: true
    multiline.match: after
    - type: log
    paths:
    - "/var/log/apache2/*"
    fields:
    apache: true
    fields_under_root: true
    multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
    multiline.negate: true
    multiline.match: after
    output:
    logstash:
    hosts: ["localhost:5044"]
  • pattern:正则表达式
  • negate:默认为false,表示匹配pattern的行合并到上一行;true表示不匹配pattern的行合并到上一行
  • match:after表示合并到上一行的末尾,before表示合并到上一行的行首
  1. 如何将Kibana中显示日志的时间字段替换为日志信息中的时间?
    使用grok分词插件与date时间格式化插件来实现,在Logstash的配置文件的过滤器中配置grok分词插件与date时间格式化插件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    input {
    beats {
    port => 5044
    }
    }

    filter {
    multiline {
    pattern => "%{LOGLEVEL}\s*\]\[%{YEAR}%{MONTHNUM}%{MONTHDAY}\s+%{TIME}\]"
    negate => true
    what => "previous"
    }
    grok {
    match => [ "message" , "(?<customer_time>%{YEAR}%{MONTHNUM}%{MONTHDAY}\s+%{TIME})" ]
    }
    date {
    match => ["customer_time", "yyyyMMdd HH:mm:ss,SSS"]
    target => "@timestamp"
    }
    }
    output {
    elasticsearch {
    hosts => "localhost:9200"
    }
    }
  2. 如何在Kibana中通过选择不同的系统日志模块来查看数据
    在Filebeat中的配置内容为:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    filebeat.inputs:
    - type: log
    paths:
    - /var/log/system.log
    - /var/log/wifi.log
    multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
    multiline.negate: true
    multiline.match: after
    fields:
    log_from: serviceA #新增log_from字段
    - type: log
    paths:
    - "/var/log/apache2/*"
    fields:
    apache: true
    fields_under_root: true
    multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
    multiline.negate: true
    multiline.match: after
    fields:
    log_from: serviceA #新增log_from字段
    output:
    logstash:
    hosts: ["localhost:5044"]
陋室铭

Kafka三款监控工具比较

发表于 2018-06-26 | 分类于 Kafka | 阅读次数

主流的三种Kafka监控程序

  • Kafka Web Console
  • Kafka Manager
  • KafkaOffsetMonitor

Kafka Web Console

使用Kafka Web Console,可以监控

  • Brokers列表
  • Kafka 集群中 Topic列表,及对应的Partition、LogSiz e等信息
  • 点击Topic,可以浏览对应的Consumer Groups、Offset、Lag等信息
  • 生产和消费流量图,消息预览

注: 唱给你发虚运行后,会定时去读取kafka集群分区的日志长度,读取完毕后,连接没有正常释放,一段时间后产生大量的socket连接,导致网络堵塞

Kafka Manager

雅虎开源的Kafka集群管理工具:

  • 管理几个不同的集群
  • 监控集群的状态(topics,brokers,副本分布,分区分布)
  • 产生分区分配,基于集群的当前状态
  • 重新分配分区

KafkaOffsetMonitor

  • KafkaOffsetMonitor 可以实时监控
  • Kafka集群状态
  • Topic、Consumer Group列表
  • 图形化展示topic和consumer之间的关系
  • 图形化展示consumer的Offset、Lag等信息

总结

  • Kafka Web Console:监控功能较为全面,可以预览消息,监控Offset、Lag等信息,但存在bug,不建议在生产环境中使用

  • Kafka Manager:偏向Kafka集群管理,若操作不当,容易导致集群出现故障。对Kafka实时生产和消费消息是通过JMX实现的。没有记录Offset、Lag等信息

  • KafkaOffsetMonitor:程序一个jar包的形式运行,部署较为方便。只有监控功能,使用起来也较为安全

若只需要监控功能,推荐使用KafkaOffsetMonitor,若偏重Kafka集群管理,推荐使用Kafka Manager

陋室铭

SpringBoot使用@Async实现异步调用之自定义线程池

发表于 2018-06-26 | 分类于 SpringBoot | 阅读次数

自定义线程池

在SpringBoot 主类中定义一个线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@EnableAsync
@Configuration
class TaskPoolConfig {

@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("taskExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
}

上面通过ThreadPoolTaskExecutor 创建了一个线程池,同时设置了以下这些参数:

  • 核心线程数10:线程池创建时候初始化的线程数
  • 最大线程数20:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
  • 缓冲队列200:用来缓冲执行任务的队列
  • 允许线程的空闲时间60秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
  • 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
  • 线程池对拒绝任务的处理策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务

使用线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
@Component
public class Task {
public static Random random = new Random();
@Async("taskExecutor")
public void doTaskOne() throws Exception {
log.info("开始做任务一");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
log.info("完成任务一,耗时:" + (end - start) + "毫秒");
}

@Async("taskExecutor")
public void doTaskTwo() throws Exception {
log.info("开始做任务二");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
log.info("完成任务二,耗时:" + (end - start) + "毫秒");
}
@Async("taskExecutor")
public void doTaskThree() throws Exception {
log.info("开始做任务三");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
log.info("完成任务三,耗时:" + (end - start) + "毫秒");
}
}
陋室铭

Redis监控

发表于 2018-06-26 | 阅读次数

Redis监控之Redis-monitor

简介

Redis-monitor 一个可视化的redis 监控程序。使用FLask+sqlite,非常轻量级,非常容易使用和部署。

What

监控数据包括以下:

  • redis服务器信息,包括redis版本,上线时间,os系统信息等等。
  • 实时的消息处理信息,例如处理command数量、连接总数量等
  • 联通时间动态图表
  • ops时间动态图表
  • 内存占用、cpu消耗实时动态图表

使用

  1. 首先安装python库

    1
    pip install redis-monitor
  2. 初始化配置和数据库

    1
    redis-monitor init
  3. 启动webserver

    1
    2
    3
    redis-monitor start //默认端口9527

    redis-monitor start -p 9999

然后访问,127.0.0.1:9527

开源地址

陋室铭

Kafka监控之KafkaOffsetMonitor使用

发表于 2018-06-25 | 阅读次数

简述

KafkaOffsetMonitor是Kafka的一款客户端消费监控工具,用来实时监控Kafka服务的Consumer以及他们所在的Partition中的Offset,可以浏览当前的消费者组,并且每个Topic的所有Partition的消费情况一目了然。

注
经过测试,KafkaOffsetMonitor对kafka_2.10-0.10.2.1版本可以查看consumer groups。最新版本无法查看groups,部分功能不可用。

下载

官方下载
官方维护ing,支持Scala2.11版编译的Kafka
百度网盘

启动

方式一
直接启动

1
2
3
4
5
6
java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--offsetStorage kafka --zk 127.0.0.1:2181 \
--port 8096 \
--refresh 10.seconds \
--retain 2.day 1>./logs/stdout.log 2>./logs/stderr.log &

方式二
编写脚本kafka-monitor-start-0.3.0.sh

1
2
3
4
5
6
7
#!/bin/bash
java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--offsetStorage kafka --zk 127.0.0.1:2181 \
--port 8096 \
--refresh 10.seconds \
--retain 2.day 1>./logs/stdout.log 2>./logs/stderr.log &

各个参数的含义

  • offsetStorage:有效的选项是”zookeeper”,”kafka”,”storm”。0.9版本以后,offset存储的位置在kafka。

  • zk: zookeeper的地址

  • port: 端口号
  • refresh 刷新频率,更新到DB
  • retain 保留DB的时间
  • dbName 在哪里存储(默认offsetapp)
陋室铭

使用gradle发布项目到Jcenter仓库

发表于 2018-06-24 | 分类于 android | 阅读次数

申请Bintray账号

Bintray的基本功能类似于Maven Central,申请账号,进行注册。

登录后,点击edit打开编辑,然后点击api key

项目中配置

项目的build.gradle添加构建依赖,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
buildscript {

repositories {
google()
jcenter()
}
dependencies {
classpath 'com.android.tools.build:gradle:3.1.3'
classpath 'com.jfrog.bintray.gradle:gradle-bintray-plugin:1.6'
classpath 'com.github.dcendents:android-maven-gradle-plugin:1.3'
}
}

allprojects {
repositories {
google()
jcenter()
}
}

task clean(type: Delete) {
delete rootProject.buildDir
}

  1. 在.gradle目录下,编辑gradle.properties加入配置

    1
    2
    BINTRAY_USER=xxx  
    BINTRAY_KEY=xxx
  2. 在本地项目的local.properties,添加如上配置

在gradle.properties文件下配置

1
2
3
4
5
6
7
8
9
10
11
12
PROJ_GROUP=com.android.icefire
PROJ_VERSION=1.0.1
PROJ_NAME=wifimanagerlib
PROJ_WEBSITEURL=https://github.com/ycj0808/wifimanager
PROJ_ISSUETRACKERURL=https://github.com/ycj0808/wifimanager/issues
PROJ_VCSURL=https://github.com/ycj0808/wifimanager.git
PROJ_DESCRIPTION=WIFI manager
PROJ_ARTIFACTID=wifimanagerlib

DEVELOPER_ID=icefire
DEVELOPER_NAME=icefire
DEVELOPER_EMAIL=yangchj@icefire.me

添加bintray.gradle文件,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
group = PROJ_GROUP
version = PROJ_VERSION
project.archivesBaseName = PROJ_ARTIFACTID

apply plugin: 'com.jfrog.bintray'
apply plugin: 'maven-publish'

task sourcesJar(type: Jar) {
from android.sourceSets.main.java.srcDirs
classifier = 'sources'
}

task javadoc(type: Javadoc) {
source = android.sourceSets.main.java.srcDirs
classpath += configurations.compile
classpath += project.files(android.getBootClasspath().join(File.pathSeparator))
}

task javadocJar(type: Jar, dependsOn: javadoc) {
classifier = 'javadoc'
from javadoc.destinationDir
}

javadoc {
options{
encoding "UTF-8"
charSet 'UTF-8'
author true
version true
links "http://docs.oracle.com/javase/7/docs/api"
title PROJ_ARTIFACTID
}
}

artifacts {
archives javadocJar
archives sourcesJar
}

def pomConfig = {
licenses {
license {
name "The Apache Software License, Version 2.0"
url "http://www.apache.org/licenses/LICENSE-2.0.txt"
distribution "repo"
}
}
developers {
developer {
id DEVELOPER_ID
name DEVELOPER_NAME
email DEVELOPER_EMAIL
}
}
}

/*publishing {
publications {
mavenJava(MavenPublication) {
artifactId PROJ_ARTIFACTID
pom{
packaging 'aar'
}
pom.withXml {
def root = asNode()
root.appendNode('description', PROJ_DESCRIPTION)
root.children().last() + pomConfig
}
}
}
}*/


install {
repositories.mavenInstaller {
pom.project {
packaging 'aar'
name COMMON_NOTIFICATION_NAME
url PROJ_WEBSITEURL
licenses {
license {
name "The Apache Software License, Version 2.0"
url "http://www.apache.org/licenses/LICENSE-2.0.txt"
distribution "repo"
}
}
developers {
developer {
id DEVELOPER_ID
name DEVELOPER_NAME
email DEVELOPER_EMAIL
}
}

scm {
connection PROJ_WEBSITEURL
developerConnection PROJ_WEBSITEURL
url PROJ_WEBSITEURL
}
}
}
}

bintray {
Properties properties = new Properties()
properties.load(project.rootProject.file('local.properties').newDataInputStream())

user = properties.getProperty('BINTRAY_USER');
key = properties.getProperty('BINTRAY_KEY');

configurations = ['archives']
publications = ['mavenJava']
publish = true

pkg {
repo = 'maven'
name = PROJ_NAME
desc = PROJ_DESCRIPTION
websiteUrl = PROJ_WEBSITEURL
issueTrackerUrl = PROJ_ISSUETRACKERURL
vcsUrl = PROJ_VCSURL
licenses = ['Apache-2.0']
publicDownloadNumbers = true
}
}

在libray的build.gradle下添加

1
2
//文件的最下方添加
apply from: '../bintray.gradle'

执行发布命令

1
2
3

./gradlew install
./gradlew bintrayUpload

将库加入Jcenter

登录 bintray.com,刚刚发布的库申请加入到jcenter,进入此页面,点击Include My Package,然后在弹出的对话框中搜索并勾上你的项目。然后你可以写一下你的提交请求(貌似也可以不写?),点“Send”,接下来就看管理员审核了。

陋室铭

Redis详解

发表于 2018-06-15 | 阅读次数

(1)什么是redis?

Redis 是一个基于内存的高性能key-value数据库。

(2)Reids的特点

Redis本质上是一个Key-Value类型的内存数据库,很像memcached,整个数据库统统加载在内存当中进行操作,定期通过异步操作把数据库数据flush到硬盘上进行保存。因为是纯内存操作,Redis的性能非常出色,每秒可以处理超过 10万次读写操作,是已知性能最快的Key-Value DB。

Redis的出色之处不仅仅是性能,Redis最大的魅力是支持保存多种数据结构,此外单个value的最大限制是1GB,不像 memcached只能保存1MB的数据,因此Redis可以用来实现很多有用的功能,比方说用他的List来做FIFO双向链表,实现一个轻量级的高性 能消息队列服务,用他的Set可以做高性能的tag系统等等。
另外Redis也可以对存入的Key-Value设置expire时间,因此也可以被当作一 个功能加强版的memcached来用。

Redis的主要缺点是数据库容量受到物理内存的限制,不能用作海量数据的高性能读写,因此Redis适合的场景主要局限在较小数据量的高性能操作和运算上。

(3)Redis支持的数据类型

Redis通过Key-Value的单值不同类型来区分, 以下是支持的类型:
Strings
Lists
Sets 求交集、并集
Sorted Set
hashes

(4)为什么redis需要把所有数据放到内存中?

Redis为了达到最快的读写速度将数据都读到内存中,并通过异步的方式将数据写入磁盘。所以redis具有快速和数据持久化的特征。如果不将数据放在内存中,磁盘I/O速度为严重影响redis的性能。在内存越来越便宜的今天,redis将会越来越受欢迎。
如果设置了最大使用的内存,则数据已有记录数达到内存限值后不能继续插入新值。

(5)Redis是单进程单线程的

redis利用队列技术将并发访问变为串行访问,消除了传统数据库串行控制的开销

(6)虚拟内存

当你的key很小而value很大时,使用VM的效果会比较好.因为这样节约的内存比较大.
当你的key不小时,可以考虑使用一些非常方法将很大的key变成很大的value,比如你可以考虑将key,value组合成一个新的value.
vm-max-threads这个参数,可以设置访问swap文件的线程数,设置最好不要超过机器的核数,如果设置为0,那么所有对swap文件的操作都是串行的.可能会造成比较长时间的延迟,但是对数据完整性有很好的保证.

自己测试的时候发现用虚拟内存性能也不错。如果数据量很大,可以考虑分布式或者其他数据库

(7)分布式

redis支持主从的模式。原则:Master会将数据同步到slave,而slave不会将数据同步到master。Slave启动时会连接master来同步数据。

这是一个典型的分布式读写分离模型。我们可以利用master来插入数据,slave提供检索服务。这样可以有效减少单个机器的并发访问数量

(8)读写分离模型

通过增加Slave DB的数量,读的性能可以线性增长。为了避免Master DB的单点故障,集群一般都会采用两台Master DB做双机热备,所以整个集群的读和写的可用性都非常高。
读写分离架构的缺陷在于,不管是Master还是Slave,每个节点都必须保存完整的数据,如果在数据量很大的情况下,集群的扩展能力还是受限于单个节点的存储能力,而且对于Write-intensive类型的应用,读写分离架构并不适合。

​

(9)数据分片模型

为了解决读写分离模型的缺陷,可以将数据分片模型应用进来。

可以将每个节点看成都是独立的master,然后通过业务实现数据分片。

结合上面两种模型,可以将每个master设计成由一个master和多个slave组成的模型。

(10)Redis的回收策略

    • volatile-lru:从已设置过期时间的数据集(server.db[i].expires)中挑选最近最少使用的数据淘汰

    • volatile-ttl:从已设置过期时间的数据集(server.db[i].expires)中挑选将要过期的数据淘汰

    • volatile-random:从已设置过期时间的数据集(server.db[i].expires)中任意选择数据淘汰

    • allkeys-lru:从数据集(server.db[i].dict)中挑选最近最少使用的数据淘汰

    • allkeys-random:从数据集(server.db[i].dict)中任意选择数据淘汰

    • no-enviction(驱逐):禁止驱逐数据

      • 1. 使用Redis有哪些好处?

        (1) 速度快,因为数据存在内存中,类似于HashMap,HashMap的优势就是查找和操作的时间复杂度都是O(1)

        (2) 支持丰富数据类型,支持string,list,set,sorted set,hash

        (3) 支持事务,操作都是原子性,所谓的原子性就是对数据的更改要么全部执行,要么全部不执行

        (4) 丰富的特性:可用于缓存,消息,按key设置过期时间,过期后将会自动删除

        2. redis相比memcached有哪些优势?

        (1) memcached所有的值均是简单的字符串,redis作为其替代者,支持更为丰富的数据类型

        (2) redis的速度比memcached快很多

        (3) redis可以持久化其数据

        3. redis常见性能问题和解决方案:

        (1) Master最好不要做任何持久化工作,如RDB内存快照和AOF日志文件

        (2) 如果数据比较重要,某个Slave开启AOF备份数据,策略设置为每秒同步一次

        (3) 为了主从复制的速度和连接的稳定性,Master和Slave最好在同一个局域网内

        (4) 尽量避免在压力很大的主库上增加从库

        (5) 主从复制不要用图状结构,用单向链表结构更为稳定,即:Master <- Slave1 <- Slave2 <- Slave3…

        这样的结构方便解决单点故障问题,实现Slave对Master的替换。如果Master挂了,可以立刻启用Slave1做Master,其他不变。

**4. MySQL里有2000w数据,redis中只存20w的数据,如何保证redis中的数据都是热点数据**

 相关知识:redis 内存数据集大小上升到一定大小的时候,就会施行数据淘汰策略。redis 提供 6种数据淘汰策略:

voltile-lru:从已设置过期时间的数据集(server.db[i].expires)中挑选最近最少使用的数据淘汰

volatile-ttl:从已设置过期时间的数据集(server.db[i].expires)中挑选将要过期的数据淘汰

volatile-random:从已设置过期时间的数据集(server.db[i].expires)中任意选择数据淘汰

allkeys-lru:从数据集(server.db[i].dict)中挑选最近最少使用的数据淘汰

allkeys-random:从数据集(server.db[i].dict)中任意选择数据淘汰

no-enviction(驱逐):禁止驱逐数据



**5. Memcache与Redis的区别都有哪些?**

1)、存储方式

Memecache把数据全部存在内存之中,断电后会挂掉,数据不能超过内存大小。

Redis有部份存在硬盘上,这样能保证数据的持久性。

2)、数据支持类型

Memcache对数据类型支持相对简单。

Redis有复杂的数据类型。

3)、使用底层模型不同

它们之间底层实现方式 以及与客户端之间通信的应用协议不一样。

Redis直接自己构建了VM 机制 ,因为一般的系统调用系统函数的话,会浪费一定的时间去移动和请求。

4),value大小

redis最大可以达到1GB,而memcache只有1MB

**6. Redis 常见的性能问题都有哪些?如何解决?**



1).Master写内存快照,save命令调度rdbSave函数,会阻塞主线程的工作,当快照比较大时对性能影响是非常大的,会间断性暂停服务,所以Master最好不要写内存快照。



2).Master AOF持久化,如果不重写AOF文件,这个持久化方式对性能的影响是最小的,但是AOF文件会不断增大,AOF文件过大会影响Master重启的恢复速度。Master最好不要做任何持久化工作,包括内存快照和AOF日志文件,特别是不要启用内存快照做持久化,如果数据比较关键,某个Slave开启AOF备份数据,策略为每秒同步一次。



3).Master调用BGREWRITEAOF重写AOF文件,AOF在重写的时候会占大量的CPU和内存资源,导致服务load过高,出现短暂服务暂停现象。

4). Redis主从复制的性能问题,为了主从复制的速度和连接的稳定性,Slave和Master最好在同一个局域网内

​

****

**7, redis 最适合的场景**



Redis最适合所有数据in-momory的场景,虽然Redis也提供持久化功能,但实际更多的是一个disk-backed的功能,跟传统意义上的持久化有比较大的差别,那么可能大家就会有疑问,似乎Redis更像一个加强版的Memcached,那么何时使用Memcached,何时使用Redis呢?

​       如果简单地比较Redis与Memcached的区别,大多数都会得到以下观点:

​     1 、Redis不仅仅支持简单的k/v类型的数据,同时还提供list,set,zset,hash等[数据结构](http://lib.csdn.net/base/datastructure)的存储。
​     2 、Redis支持数据的备份,即master-slave模式的数据备份。
​     3 、Redis支持数据的持久化,可以将内存中的数据保持在磁盘中,重启的时候可以再次加载进行使用。

### (1)、会话缓存(Session Cache)

最常用的一种使用Redis的情景是会话缓存(session cache)。用Redis缓存会话比其他存储(如Memcached)的优势在于:Redis提供持久化。当维护一个不是严格要求一致性的缓存时,如果用户的购物车信息全部丢失,大部分人都会不高兴的,现在,他们还会这样吗?

幸运的是,随着 Redis 这些年的改进,很容易找到怎么恰当的使用Redis来缓存会话的文档。甚至广为人知的商业平台Magento也提供Redis的插件。

### (2)、全页缓存(FPC)

除基本的会话token之外,Redis还提供很简便的FPC平台。回到一致性问题,即使重启了Redis实例,因为有磁盘的持久化,用户也不会看到页面加载速度的下降,这是一个极大改进,类似[PHP](http://lib.csdn.net/base/php)本地FPC。

再次以Magento为例,Magento提供一个插件来使用Redis作为[全页缓存后端](https://github.com/colinmollenhour/Cm_Cache_Backend_Redis)。

此外,对WordPress的用户来说,Pantheon有一个非常好的插件  [wp-redis](https://wordpress.org/plugins/wp-redis/),这个插件能帮助你以最快速度加载你曾浏览过的页面。

### (3)、队列

Reids在内存存储引擎领域的一大优点是提供 list 和 set 操作,这使得Redis能作为一个很好的消息队列平台来使用。Redis作为队列使用的操作,就类似于本地程序语言(如[Python](http://lib.csdn.net/base/python))对 list 的 push/pop 操作。

如果你快速的在Google中搜索“Redis queues”,你马上就能找到大量的开源项目,这些项目的目的就是利用Redis创建非常好的后端工具,以满足各种队列需求。例如,Celery有一个后台就是使用Redis作为broker,你可以从[这里](http://celery.readthedocs.org/en/latest/getting-started/brokers/redis.html)去查看。

### (4),排行榜/计数器

Redis在内存中对数字进行递增或递减的操作实现的非常好。集合(Set)和有序集合(Sorted Set)也使得我们在执行这些操作的时候变的非常简单,Redis只是正好提供了这两种数据结构。所以,我们要从排序集合中获取到排名最靠前的10个用户–我们称之为“user_scores”,我们只需要像下面一样执行即可:

当然,这是假定你是根据你用户的分数做递增的排序。如果你想返回用户及用户的分数,你需要这样执行:

ZRANGE user_scores 0 10 WITHSCORES

Agora Games就是一个很好的例子,用Ruby实现的,它的排行榜就是使用Redis来存储数据的,你可以在[这里]()看到。

### (5)、发布/订阅

最后(但肯定不是最不重要的)是Redis的发布/订阅功能。发布/订阅的使用场景确实非常多。我已看见人们在社交网络连接中使用,还可作为基于发布/订阅的脚本触发器,甚至用Redis的发布/订阅功能来建立聊天系统!(不,这是真的,你可以去核实)。

Redis提供的所有特性中,我感觉这个是喜欢的人最少的一个,虽然它为用户提供如果此多功能。
1234…6
icefire

icefire

Stick with it,and keep moving.

53 日志
15 分类
21 标签
GitHub
© 2016 - 2021 icefire 辽ICP备16011524号-1
由 Hexo 强力驱动
主题 - NexT.Pisces