一、开篇
了解“认知心理学”的朋友应该知道,人类对事物的认知,总是由浅入深。然而,每个人思考的深度千差万别,关键在于思考的方式。通过提问三部曲——WHAT->HOW->WHY,可以帮助我们一步步地从事物的表象深入到事物的本质。比如学习一个新的技术框架,需要逐步搞清楚她是什么、如何使用、为什么这样设计,由浅入深。
“WHY+HOW+WHAT”,是笔者最钟爱的一种思维模式。其使用方法不仅限于上述认知过程中的思考方式,通过不同的顺序组合,可以使用在不同的场景。比如,在筹划一个项目时,采用“WHY->WHAT->HOW”的思考方式,先搞清楚为什么要做这个项目,然后是需要做哪些工作来完成这个项目,最后考虑怎么做、技术选型。这个思考方式也将被广泛使用在本系列的各个文章中。
在创业公司中做数据相关的事情,而且是从零做起,肯定不像很多大公司那样分工明细,所有的工作都要保证在有限的资源下来满足需求。笔者在此做些总结分享,结合我们的系统来谈一谈如何做数据分析。如果有写的不好的地方,还请指正。
作为系列文章的开篇,本文将按照“WHY->WHAT->HOW”的思考方式来阐述下面三个问题:
- 创业公司为什么需要做数据分析?
- 创业公司做数据分析,需要做哪些事情?
- 如何实现这些数据上的需求?
WHY
随着移动互联网的发展和大数据思维的普及,越来越多的创业者、投资人开始重视数据的作用,而不再是随便拍脑袋。“数据驱动决策”、“精准化运营”、“产品快速迭代”这些概念被越来越多的人提出和使用,其背后都离不开精准的数据分析。对于大多数互联网创业公司来说,其背后没有强大的资源与财主支撑,如何在有限的人力、物力下快速摸索、少走弯路是至关重要的,而基于“数据驱动”来做决策、运营与产品将起到一个关键的作用。让我们来看两个例子。
【例一】
微信公众号早已成为各家运营的主战场之一,利用微信的关系链来转发H5海报页面是众多线上活动和拉新的一个重要方式。然而,不管是做某个线上推广活动,还是通过线下某个渠道引导用户分享、注册,我们都需要指标来衡量活动效果,从而摸清运营的方向。
数据,便是关键!该活动带来的浏览量、分享量、新注册用户数、用户留存率都是重要的指标,而这一切都离不开有效的数据追踪与分析。如果同时有100个这样的渠道活动,如何统筹各个数据分析也将是一件无法忽视的事情。(下图呈现的是某次活动的传播网络的一部分)
【例二】
每逢节假日,国内各个旅游景点都是人山人海,尽管大家都知道外出游玩会遭遇这种情况,但是还是抱着一丝侥幸心理出行,毕竟好不容易有了假期嘛。在十一时,笔者就曾利用百度景区热力分布图来提前观察,从而避开了一些高峰期和人满为患的景区,大家不妨也试一试。
对于很多创业公司,特别基于LBS提供服务的企业来说,都期望搞清楚“用户在哪里”、“哪里是用户感兴趣的地方”,从而摸清早期的投入方向,毕竟全面开花、四处征战的方式是不适于创业公司的。
通过位置数据,来分析用户集中在哪些区域,主要分布在商业区还是高校,是否受到交通因素影响等等,当然,具体需要结合业务来做了。另一方面,还可以聚合出用户的常驻位置,可以对用户位置与商户位置的距离进行分析等等,从而形成推荐方案,优化产品与服务。
WHAT
对于大多数互联网创业公司,在做数据分析时,一定要结合自己的业务,把握一个度,在投入可控的范围内达到效果即可。数据深度挖掘、机器学习、推荐算法等等,这些技术名词背后都需要投入一定的人力、物力来支撑,即使是大厂来玩,产出也相对有限,而且很多时候实际工程效果不尽人意。举个列子,很多高端的“推荐算法”在投入使用后,其效果远不如“看了又看”来的简单有效。当然,如果你的公司就是做数据这方面的业务,那是另一回事了。
要搞清楚需要做什么,不妨先结合自身业务思考一下,现阶段自己需要什么数据来驱动决策、运营与产品。具体业务方面的数据需求,各家都不一样。从笔者接触的情况来看,早期大部分的数据需求集中在两块:运营数据的统计分析、产品使用情况的统计分析。后期随着产品线的发展,一般会延伸出一些与产品相关的数据业务,比如线上推荐。
从流程上看,需要做的事情集中在三部分:数据采集、数据处理和数据可视化。伴随着数据的变迁:原始数据->分析结果->图表呈现。
首先,基础数据源的建设是做好数据分析的关键,因为如果数据源本身出了问题,那么后面做的所有工作都是没有意义的,而且如果没有提前做好数据采集,后期想做分析时也没有数据可做。
其次,数据分析的最终结果是需要呈现给别人看的,可能是公司高层,也可能是市场业务人员,直接将一堆数据丢给他们显然是不现实的,通常都需要转换为图表的形式,这便是数据可视化的工作。而从原始数据源到分析结果的过程,便归纳为数据处理,其涵盖了数据提取、数据建模、数据分析等多个步骤。
HOW
现如今国内的互联网环境发展的越来越好,第三方服务提供商越来越多。所以很多情况下我们都有两个选择:接入第三方、自己做。
- 接入第三方。数据分析这块,便有很多第三方服务,笔者将其划分为传统数据统计服务与新兴的数据公司。前者以百度统计、google analysis为代表,通过嵌入其SDK在前端采集数据,在后台便可以查看相应的统计数据。这种方式的好处是简单、免费,使用非常普及,是很多初创企业的首选。缺点也很明显,一是这样的统计只能分析一些基本的访问量、点击率、活跃用户量,满足基本需求,无法结合业务数据来做深度分析;二是需要在前端很多地方埋点上报,耦合性较强;三是数据存储在第三方的服务器中,无法直接获取到数据源。后者以神策、GrowingIO、诸葛IO为代表,这些公司也正是看到了传统数据统计服务的缺点,从而提出相应的解决方案,各有特色。但是,需要不菲的接入费用,私有部署的费用更多,而这笔费用对于一个初创企业来说,还是蛮多的。另一方面他们更加侧重于电商领域的数据分析,因为这个领域的分析模式已经基本成型,适合做成模板来使用。
- 自己做。选择自己做的话,可以结合自身的业务,做的更灵活,同时也可以尽早摸索数据业务,逐步建立相应的数据系统。当然,自己做并不代表是造轮子,而是要充分利用开源框架来实现相应的功能。
鉴于各家的业务都不同,而抛开业务谈架构都是耍流氓,所以在接下来的文章中,笔者将结合自己接触的业务来探讨一些数据系统的实现。
下图所示便是现阶段我们的数据系统架构,主要分为数据采集、数据处理与数据应用三层。
- 数据采集。从下往上,数据采集层负责从前端App、H5页面、服务器日志采集数据,通过Kafka接入后存入Elasticsearch与neo4j中,同时业务数据库也是很重要的数据源;
- 数据处理。数据处理层负责数据的抽取、清洗、建模,然后存入MongoDB与MySQL中,整个过程由Airflow任务调度管理系统来进行管理与监控;
- 数据应用。产出的数据最终提供给应用层使用。
也许有人要说,连Hadoop都没用到,怎么号称自己在做数据分析呢。笔者曾经也做过考虑和尝试,最终暂时搁置了Hadoop,主要是数据增长相对缓慢并且没有很明显的需求,目前这个架构可以在较长一段时间内应对数据需求了。
二、运营数据系统
我们首先来探讨应用层中的运营数据系统,因为运营数据几乎是所有互联网创业公司开始做数据的起点,也是早期数据服务的主要对象。本文将着重回顾下我们做了哪些工作、遇到过哪些问题、如何解决并实现了相应的功能。
早期数据服务
产品上线开始推广后不久,后台研发人员便会经常收到运营同事的私信:“能不能查一下有多少用户注册了,来自哪里?……..”。几次之后,大家便觉得这样的效率太低了:研发人员需要在繁忙的开发任务中抽时间来做数据查询、统计,而运营同事则需要等很久才能拿到数据。于是,大家开始协商更好的方法,最终达成一致:由运营同事提供所需的数据模板,后台研发人员根据模板将数据导入Excel文件,运营同事可根据自身需求自己分析统计。这便是早期的数据服务了,其组成结构如下图所示。
这样的做法简单明了,后台研发人员根据数据模板写一个Python脚本,从业务数据库中将数据捞出来,做些分析、整合后,将结果输出到一个Excel文件,然后发送邮件通知运营同事接收文件。然而,随着需求的增加和细化、数据量的增加,暴露的问题越来越多,这里先罗列出来,这些问题有的会在本文提出解决方案,有的则会在后面的文章中陆续提出解决方案。
- Worker越来越多,分布在多个地方,存在很多重复的劳动和代码,某个逻辑的修改需要改很多文件。
- 由于使用ORM来访问数据库,很多代码只考虑逻辑,没考虑到查询数据的效率问题,导致有些报告需要跑十几个小时才能出结果。
- 中间计算结果流失,数据没有共享,每个Worker都要跑自己的逻辑去算一遍。
- Woker依靠crontab来控制触发,没有监管,经常由于脏数据导致中断,需要等到运营同事发现后报过来才知道。
运营数据Dashboard
随着业务的发展,以数据报表的形式来提供数据服务逐渐不能满足需求了。一方面,高层期望每天一早便能看到清晰的数据,搞清楚最近的运营效果和趋势;另一方面,虽然数据报表提供了详细的数据,但是还是需要手动去过滤、统计一下才有结果,所有想看数据的人都需要做一遍才行,而业务人员处理Excel的水平层次不齐。
于是,我们开始筹划Dashboard系统,以Web的形式提供数据可视化服务。可是,Dashboard要做成什么样子?由于产品经理和设计人员都忙于产品业务,所以只能自己考虑要做什么、怎么做。好在笔者之前用过百度统计,对那里面的一些统计服务比较清楚,结合公司的业务,形成了一些思路:
- 数据内容上,包含:核心指标数据和图表分析两部分。前者以曲线图为主,要能快速显示数量和趋势,比如注册日增量趋势图;后者使用各种图表来展现某个时间段内的分析结果,比如10月份的TOP10用户感兴趣品牌。
- 数据类型上,包含:C端核心指标、B端核心指标、核心分析和专题活动指标与分析。前两者是分别针对C端和B端的指标数据,核心分析是一些综合的分析,比如转化率分析,专题活动是针对一些特定的大型运营活动。
- 数据维度上,包含:时间维度、城市维度和B端品牌维度。时间是最基本最重要的维度,城市维度可以分析各个运营大区的状态,B端品牌维度主要是针对B端上的业务。
整理后便形成了下图所示的Mockup(简化版),基本涵盖了上述的思路。虽然在美观上相对欠缺,但是毕竟是内部使用嘛,重要的数据显示要能准确、快速。
整体架构
系统的整体架构如下图所示,主要基于这么几点考虑:
- 前后端分离。前端只负责加载图表、请求数据并显示,不做任何数据逻辑处理;后端负责产出数据,并提供REST API与前端交互。
- 离线与实时计算并存。为了提高数据获取的速度,曲线指标数据采用离线计算的方式,提供历史数据供前端展示;图表分析类数据采用实时计算的方式,其速度取决于所选时间段内的数据量,必要时进行缓存。
前端实现
Dashboard系统的前端并不复杂,前面也提到我们不会做太多样式上的工作,重点是数据的显示。那么,第一件事就是要寻找一款图表库。笔者这里选择的是百度ECharts,其提供了丰富的图表类型,可视化效果很棒,对移动端的支持很友好,重要的是有详细的示例和文档。事实证明ECharts确实很强大,很好的满足了我们的各种需求。
选好了图表库,接下来的问题是如何优雅的加载几十个图表,甚至更多。这就需要找到图表显示共性的地方(行为和属性),进行抽象。通常,使用ECharts显示一个数据图表需要四步(官方文档):
- 第一步,引入ECharts的JS文件;
- 第二步,声明一个DIV作为图表的容器;
- 第三步,初始化一个echart实例,将其与DIV元素绑定,并初始化配置项;
- 第四步,加载图表的数据并显示。
可以发现,行为上主要分为初始化和更新数据两个,属性上主要是初始配置项和数据。
基于此,笔者使用“Pattern+Engine”的思想来实现前端数据加载。首先,在JS中使用JSON对每个图表进行配置,即写Pattern。例如,下面的配置便对应了一个图表,elementId是DIV的id,title是图表的标题,names是图表的曲线名称,url提供了获取数据的API,loader表示要加载的图表Engine。而一个页面的图表便由一组这样的配置项组成。
{ elementId: 'register_status_app_daily', title: 'App注册统计(日增量)', names: ['用户数'], url: '/api/dashboard/register_status_daily/', loader: 'line_loader' }
页面加载时,根据Pattern中的配置项生成相应的Loader Engine实例,用来初始化图表和更新数据。每个Loader对应一个ECharts图表类型,因为不同图表类型的初始化和加载数据的方法不同。程序类图如下所示。
后端实现
前面提到在早期的数据服务中,存在很多重复劳动和代码,因此在Dashboard系统的后端实现中,笔者开始考虑构建数据分析的公共库,这块占据了很大一部分工作量。底层公共库不针对任何特殊业务需求,主要负责三件事:
- 第一,封装数据源连接方法;
- 第二,封装时间序列的生成方法,产生以天、周、月为间隔的时间序列;
- 第三,封装基础的数据查询、清洗、统计、分析方法,形成格式化的数据,这部分是最重要的。
完成了底层公共库的构建后,整个代码结构一下子就清爽了很多。在其基础上,开始构建上层的Analyzer。Analyzer用于完成具体的数据分析需求,每个Analyzer负责一个或多个数据指标的产出,每个曲线图/图表的数据由一个Analyzer来负责。离线计算与实时计算,则是分别在Schedule和Web请求的触发下,调用对应的Analyzer来完成数据产出。因此,整个后台系统分为三层来实现,如下图所示。
离线数据
最后谈一谈离线数据的问题。目前离线计算是由Schedule来触发,每日零点计算前一日的数据,数据按照“每个指标在不同维度上每天一个数据点”的原则来生成,由上述的Analyzer来负责产出格式化的数据,存入MongoDB中。由于查询规则简单,只需建立一个组合索引就可以解决效率问题了。目前数据量在500W左右,暂时没有出现性能问题,后期可以考虑将部分历史数据迁移,当然这是后话。
数据报表
Dashboard上线后,我们开始考虑将早期的数据报表服务逐步停下来,减少维护的成本。而运营同事希望能继续保留部分报表,因为Dashboard虽然提供了很多数据指标和分析,但是有些工作需要更精细的数据信息来做,比如给带来微信注册的校园代理结算工资、对新注册用户电话回访等等。经过梳理和协商,最终保留了六个数据报表。另一方面,B端的商家期望能在后台导出自己的相关数据。
综合两方面需求,笔者构建了新的数据报表系统。
新的数据报表系统,按照流程来划分为三部分:触发、执行与通知。
- 触发。内部数据报表依旧由Schedule触发,启动相应的Worker进程来执行;
- 执行。而提供给外部的报表由Web前端通过REST API来触发,将相应的任务加入Celery任务队列中执行。执行体由一组Exporter来完成,Exporter负责获取数据、生成适合写入Excel的数据格式、写Excel文件,数据获取部分依赖前面所述的底层公共库。
- 通知。最后,统一发送邮件通知。
考虑到早期数据服务中经常遇到异常导致生成报表失败的问题,笔者在新的数据报表系统中做了两点与异常相关的处理:
- 使用Airflow对Schedule触发的任务进行监控,手动触发的任务则由Celery进行监控,遇到异常便发送邮件通知到开发人员。
- 如果一个Excel数据文件由多个Sheet组成,当某个Sheet出现异常时,通常由两种处理方法,一是丢弃整个文件,二是保留其他Sheet信息继续生成Excel文件。这里,内部报告使用了第二种处理方法,外部报告相对严谨,使用了第一种。
以上便是笔者所在公司的运营数据系统的发展历程和现状,目前Dashboard与数据报表两个系统已经趋于稳定,基本提供了90%以上的运营数据服务。当然,随着数据量的增长、业务需求的发展,一定会面临更多新的挑战。
三、用户行为数据采集系统
本章节将重点探讨数据采集层中的用户行为数据采集系统。
用户行为是什么
这里的用户行为,指的是用户与产品UI的交互行为,主要表现在Android App、IOS App与Web页面上。这些交互行为,有的会与后端服务通信,有的仅仅引起前端UI的变化,但是不管是哪种行为,其背后总是伴随着一组属性数据。
- 对于与后端发生交互的行为,我们可以从后端服务日志、业务数据库中拿到相关数据;
- 而对于那些仅仅发生在前端的行为,则需要依靠前端主动上报给后端才能知晓。
用户行为数据采集系统,便是用于从前端采集所需完整用户行为信息,并用于数据分析和其他业务的系统。
举个例子,下图所示是一次营销活动(简化版)的注册流程。如果仅仅依靠后端业务数据库,我们只能知道活动带来了多少新注册用户。而通过采集用户在前端的操作行为,则可以分析出整个活动的转化情况:海报页面浏览量—>>点击”立即注册”跳转注册页面量—>>点击“获取验证码”数量—>>提交注册信息数量—>>真实注册用户量。
而前端用户行为数据的价值不仅限于这样的转化率分析,还可以挖掘出更多的有用信息,甚至可以与产品业务结合,比如笔者最近在做的用户评分系统,便会从用户行为中抽取一部分数据作为评分依据。
用户行为数据采集系统需求背景
在早期的产品开发中,后端研发人员每人负责一个摊子,虽然也会做些数据采集的事情,但是基本上只针对自己的功能,各做各的。通常做法是,根据产品经理提出的数据需求,设计一个结构化的数据表来存储数据,然后开个REST API给前端,用来上报数据;前端负责在相应的位置埋点,按照协商好的数据格式上报给后端。
随着业务的发展,这样的做法暴露了很多问题,给前后端都带来了混乱,主要表现在:前端四处埋点,上报时调用的API不统一,上报的数据格式不统一;后端数据分散在多个数据表中,与业务逻辑耦合严重。
于是,我们考虑做一个统一的用户行为数据采集系统,基本的原则是:统一上报方式、统一数据格式、数据集中存储、尽可能全量采集。具体到实现上,归纳起来主要要解决三个问题:
- 采什么。搞清楚需要什么数据,抽象出一个统一的数据格式。
- 前端怎么采。解决前端如何有效埋点、全量采集的问题。
- 后端怎么存。解决数据集中存储、易于分析的问题。
用户行为数据采集系统实现步骤
采什么
用户在前端UI上的操作,大多数表现为两类:
- 第一类,打开某个页面,浏览其中的信息,然后点击感兴趣的内容进一步浏览;
- 第二类,打开某个页面,根据UI的提示输入相关信息,然后点击提交。
其行为可以归纳为三种:浏览、输入和点击(在移动端,有时也表现为滑动)。其中,浏览和点击是引起页面变化和逻辑处理的重要事件,输入总是与点击事件关联在一起。
因此,浏览和点击便是我们要采集的对象。对于浏览,我们关注的是浏览了哪个页面,以及与之相关的元数据。对于点击,我们关注的是点击了哪个页面的哪个元素,与该元素相关联的其他元素的信息,以及相关的元数据。
- 页面,在Android与IOS上使用View名称来表示,在Web页面上使用URL(hostname+pathname)来表示。
- 元素,使用前端开发中的UI元素id来表示。
- 与元素相关联的其他元素信息,指的是与“点击”相关联的输入/选择信息,比如在上面的注册页面中,与“提交”按钮相关联的信息有手机号、验证码、姓名。
- 元数据,是指页面能提供的其他有用信息,比如URL中的参数、App中跳转页面时传递的参数等等,这些数据往往都是很重要的维度信息。
除了这些页面中的数据信息,还有两个重要的维度信息:用户和时间。
- 用户维度,用来关联同一用户在某个客户端上的行为,采用的方案是由后端生成一个随机的UUID,前端拿到后自己缓存,如果是登录用户,可以通过元数据中的用户id来关联;
- 时间维度,主要用于数据统计,考虑到前端可能延迟上报,前端上报时会加上事件的发生时间(目前大多数正常使用的移动端,时间信息应该是自动同步的)。
综合起来,将前端上报的数据格式定义如下。uuid、event_time、page是必填字段,element是点击事件的必填字段,attrs包含了上述的元数据、与元素相关联的其他元素的信息,是动态变化的。
{ "uuid": "2b8c376e-bd20-11e6-9ebf-525499b45be6", "event_time": "2016-12-08T18:08:12", "page": "www.example.com/poster.html", "element": "register", "attrs": { "title": "test", "user_id": 1234 } }
而针对不同客户端的不同事件,通过不同的REST API来上报,每个客户端只需调用与自己相关的两个API即可。
REST API | 说明 |
/user_action/web/pv | 上报Web页面的浏览事件 |
/user_action/ios/pv | 上报IOS页面的浏览事件 |
/user_action/android/pv | 上报Android页面的浏览事件 |
/user_action/web/click | 上报Web页面的点击事件 |
/user_action/ios/click | 上报IOS页面的点击事件 |
/user_action/android/click | 上报Android页面的点击事件 |
前端怎么采
整理好数据格式和上报方式后,前端的重点工作便是如何埋点。
- 传统埋点方式。在需要上报的位置组织数据、调用API,将数据传给后端,比如百度统计、google analysis都是这样做的。这是最常用的方式,缺点是需要在代码里嵌入调用,与业务逻辑耦合在一起。
- 全埋点。近几年,一些新的数据公司提出了“无埋点”的概念,通过在底层hook所有的点击事件,将用户的操作尽量多的采集下来,因此也可以称为“全埋点”。这种方式无需嵌入调用,代码耦合性弱,但是会采集较多的无用数据,可控性差。
经过调研,结合我们自己的业务,形成了这样几点设计思路:
- hook底层的点击事件来做数据上报,在上报的地方统一做数据整理工作。
- 通过UI元素的属性值来设置是否对该元素的点击事件上报。
- 通过UI元素的属性值来设置元素的关联关系,用于获取上述的“与元素相关联的其他元素的信息”。
我们首先在Web的H5页面中做了实践,核心的代码很简单。第一,在页面加载时绑定所有的click事件,上报页面浏览事件数据。第二,通过user_action_id属性来表示一个元素是否需要上报点击事件,通过user_action_relation属性来声明当前元素被关联到哪个元素上面,具体代码实现不解释,很简单。
$(d).ready(function() { // 页面浏览上报 pvUpload({page: getPageUrl()}, $.extend({title: getTitle()}, getUrlParams())); // 绑定点击事件 $(d).bind('click', function(event) { var $target = $(event.target); // 查找是否是需要上报的元素 var $ua = $target.closest('[user_action_id]'); if ($ua.length > 0) { var userActionId = $ua.attr('user_action_id'); var userActionRelation = $("[user_action_relation=" + userActionId + "]"); var relationData = []; // 查找相关联的元素的数据信息 if (userActionRelation.length > 0) { userActionRelation.each(function() { var jsonStr = JSON.stringify({ "r_placeholder_element": $(this).get(0).tagName, 'r_placeholder_text': $(this).text() }); jsonStr = jsonStr.replace(/placeholder/g, $(this).attr('id')); jsonStr = JSON.parse(jsonStr); relationData.push(jsonStr); }); } // 点击事件上报 clickUpload({page: getPageUrl(), element: userActionId}, $.extend({title: getTitle()}, getUrlParams(), relationData)); } }); });
上述代码可以嵌入到任何HTML页面,然后只要在对应的元素中进行申明就好了。举个例子,
<div> <div> <textarea id="answer" cols="30" rows="10" user_action_relation="answer-submit"></textarea> </div> <button user_action_id="answer-submit">提 交</button> </div>
后端怎么存
数据进入后台后,首先接入Kafka队列中,采用生产消费者模式来处理。这样做的好处有:
- 第一,功能分离,上报的API接口不关心数据处理功能,只负责接入数据;
- 第二,数据缓冲,数据上报的速率是不可控的,取决于用户使用频率,采用该模式可以一定程度地缓冲数据;
- 第三,易于扩展,在数据量大时,通过增加数据处理Worker来扩展,提高处理速率。
除了前端上报的数据内容外,我们还需要在后端加入一些其他的必要信息。在数据接入Kafka队列之前,需要加入五个维度信息:客户端类型(Web/Android/IOS)、事件类型(浏览/点击)、时间、客户端IP和User Agent。在消费者Worker从Kafka取出数据后,需要加入一个名为event_id的字段数据,具体含义等下解释。因此,最后存入的数据格式便如下所示:
{ "uuid": "2b8c376e-bd20-11e6-9ebf-525499b45be6", "event_time": "2016-12-08T18:08:12", "page": "www.example.com/poster.html", "element": "register", "client_type": 0, "event_type": 0, "user_agent": "Mozilla/5.0 (Linux; Android 5.1; m3 Build/LMY47I) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/37.0.0.0 Mobile MQQBrowser/6.8 TBS/036887 Safari/537.36 MicroMessenger/6.3.31.940 NetType/WIFI Language/zh_CN", "ip": "59.174.196.123", "timestamp": 1481218631, "event_id": 12, "attrs": { "title": "test", "user_id": 1234 } }
再来看event_id的含义。前端传过来的一组组数据中,通过page和element可以区分出究竟是发生了什么事件,但是这些都是前端UI的名称,大部分是开发者才能看懂的语言,因此我们需要为感兴趣的事件添加一个通俗易懂的名称,比如上面的数据对应的事件名称为“在海报页面中注册”。将page+element、事件名称进行关联映射,然后将相应的数据记录id作为event id添加到上述的数据中,方便后期做数据分析时根据跟event id来做事件聚合。做这件事有两种方式:一种是允许相关人员通过页面进行配置,手动关联;一种是前端上报时带上事件名称,目前这两种方式我们都在使用。
最后,来看看数据存储的问题。传统的关系型数据库在存储数据时,采用的是行列二维结构来表示数据,每一行数据都具有相同的列字段,而这样的存储方式显示不适合上面的数据格式,因为我们无法预知attrs中有哪些字段数据。象用户行为数据、日志数据都属于半结构化数据,所谓半结构化数据,就是结构变化的结构化数据(WIKI中的定义),适合使用NoSQL来做数据存储。我们选用的是ElasticSearch来做数据存储,主要基于这么两点考虑:
- Elasticsearch是一个实时的分布式搜索引擎和分析引擎,具有很强的数据搜索和聚合分析能力。
- 在这之前我们已经搭建了一个ELK日志系统,可以复用Elasticsearch集群做存储,也可以复用Kibana来做一些基础的数据分析可视化。
使用Elasticsearch来做数据存储,最重要的是两件事:建立Elasticsearch的映射模板、批量插入。Elasticsearch会根据插入的数据自动建立缺失的index和doc type,并对字段建立mapping,而我们要做的创建一个dynamic template,告诉Elasticsearch如何自动建立,参考如下。批量插入,可以通过Elasticsearch的bulk API轻松解决。
"user_action_record": { "order": 0, "template": "user_action_record_*", "settings": { }, "mappings": { "_default_": { "dynamic_templates": [{ "string_fields": { "mapping": { "type": "string", "fields": { "raw": { "index": "not_analyzed", "ignore_above": 256, "type": "string" } } }, "match_mapping_type": "string" } }], "properties": { "timestamp": { "doc_values": true, "type": "date" } }, "_all": { "enabled": false } } } }
四、ELK日志系统
本章重点探讨数据采集层中的ELK日志系统。
日志,指的是后台服务中产生的log信息,通常会输入到不同的文件中,比如Django服务下,一般会有nginx日志和uWSGI日志。这些日志分散地存储在不同的机器上,取决于服务的部署情况了。如果我们依次登录每台机器去查阅日志,显然非常繁琐,效率也很低,而且也没法进行统计和检索。因此,我们需要对日志进行集中化管理,将所有机器上的日志信息收集、汇总到一起。完整的日志数据具有非常重要的作用:
- 信息查找。通过检索日志信息,定位相应的bug,找出解决方案。
- 服务诊断。通过对日志信息进行统计、分析,了解服务器的负荷和服务运行状态,找出耗时请求进行优化等等。
- 数据分析。如果是格式化的log,可以做进一步的数据分析,统计、聚合出有意义的信息,比如根据请求中的商品id,找出TOP10用户感兴趣商品。
ELK是一套开源的集中式日志数据管理的解决方案,由Elasticsearch、Logstash和Kibana三个系统组成。最初我们建设ELK日志系统的目的是做数据分析,记得第一个需求是期望利用nginx的日志,从API请求的参数中挖掘出用户的位置分布信息。后来该系统在追踪恶意刷量、优化耗时服务等方面都发挥了重要作用,而且随着对Elasticsearch的认知加深,我们将其应用到了其他方面的数据存储和分析中。 本文的重点是结合自身实践来介绍如何使用ELK系统、使用中的问题以及如何解决,文中涉及的ELK版本是:Elasticsearch 2.3、Logstash 2.3、Kibana 4。
ELK整体方案
ELK中的三个系统分别扮演不同的角色,组成了一个整体的解决方案。Logstash是一个ETL工具,负责从每台机器抓取日志数据,对数据进行格式转换和处理后,输出到Elasticsearch中存储。Elasticsearch是一个分布式搜索引擎和分析引擎,用于数据存储,可提供实时的数据查询。Kibana是一个数据可视化服务,根据用户的操作从Elasticsearch中查询数据,形成相应的分析结果,以图表的形式展现给用户。
ELK的安装很简单,可以按照“下载->修改配置文件->启动”方法分别部署三个系统,也可以使用docker来快速部署。具体的安装方法这里不详细介绍,我们来看一个常见的部署方案,如下图所示,部署思路是:
- 第一,在每台生成日志文件的机器上,部署Logstash,作为Shipper的角色,负责从日志文件中提取数据,但是不做任何处理,直接将数据输出到Redis队列(list)中;
- 第二,需要一台机器部署Logstash,作为Indexer的角色,负责从Redis中取出数据,对数据进行格式化和相关处理后,输出到Elasticsearch中存储;
- 第三,部署Elasticsearch集群,当然取决于你的数据量了,数据量小的话可以使用单台服务,如果做集群的话,最好是有3个以上节点,同时还需要部署相关的监控插件;
- 第四,部署Kibana服务,提供Web服务。
在前期部署阶段,主要工作是Logstash节点和Elasticsearch集群的部署,而在后期使用阶段,主要工作就是Elasticsearch集群的监控和使用Kibana来检索、分析日志数据了,当然也可以直接编写程序来消费Elasticsearch中的数据。
在上面的部署方案中,我们将Logstash分为Shipper和Indexer两种角色来完成不同的工作,中间通过Redis做数据管道,为什么要这样做?为什么不是直接在每台机器上使用Logstash提取数据、处理、存入Elasticsearch?
首先,采用这样的架构部署,有三点优势:第一,降低对日志所在机器的影响,这些机器上一般都部署着反向代理或应用服务,本身负载就很重了,所以尽可能的在这些机器上少做事;第二,如果有很多台机器需要做日志收集,那么让每台机器都向Elasticsearch持续写入数据,必然会对Elasticsearch造成压力,因此需要对数据进行缓冲,同时,这样的缓冲也可以一定程度的保护数据不丢失;第三,将日志数据的格式化与处理放到Indexer中统一做,可以在一处修改代码、部署,避免需要到多台机器上去修改配置。
其次,我们需要做的是将数据放入一个消息队列中进行缓冲,所以Redis只是其中一个选择,也可以是RabbitMQ、Kafka等等,在实际生产中,Redis与Kafka用的比较多。由于Redis集群一般都是通过key来做分片,无法对list类型做集群,在数据量大的时候必然不合适了,而Kafka天生就是分布式的消息队列系统。
Logstash
在官方文档中,Deploying and Scaling Logstash一文详细介绍了各种Logstash的部署架构,下图是与我们上述方案相吻合的架构。Logstash由input、filter和output三部分组成,input负责从数据源提取数据,filter负责解析、处理数据,output负责输出数据,每部分都有提供丰富的插件。Logstash的设计思路也非常值得借鉴,以插件的形式来组织功能,通过配置文件来描述需要插件做什么。我们以nginx日志为例,来看看如何使用一些常用插件。
配置nginx日志格式
首先需要将nginx日志格式规范化,便于做解析处理。在nginx.conf文件中设置:
log_format main '$remote_addr "$time_iso8601" "$request" $status $body_bytes_sent "$http_user_agent" "$http_referer" "$http_x_forwarded_for" "$request_time" "$upstream_response_time" "$http_cookie" "$http_Authorization" "$http_token"'; access_log /var/log/nginx/example.access.log main;
nginx日志–>>Logstash–>>消息队列
这部分是Logstash Shipper的工作,涉及input和output两种插件。input部分,由于需要提取的是日志文件,一般使用file插件,该插件常用的几个参数是:
- path,指定日志文件路径。
- type,指定一个名称,设置type后,可以在后面的filter和output中对不同的type做不同的处理,适用于需要消费多个日志文件的场景。
- start_position,指定起始读取位置,“beginning”表示从文件头开始,“end”表示从文件尾开始(类似tail -f)。
- sincedb_path,与Logstash的一个坑有关。通常Logstash会记录每个文件已经被读取到的位置,保存在sincedb中,如果Logstash重启,那么对于同一个文件,会继续从上次记录的位置开始读取。如果想重新从头读取文件,需要删除sincedb文件,sincedb_path则是指定了该文件的路径。为了方便,我们可以根据需要将其设置为“/dev/null”,即不保存位置信息。
input { file { type => "example_nginx_access" path => ["/var/log/nginx/example.access.log"] start_position => "beginning" sincedb_path => "/dev/null" } }
output部分,将数据输出到消息队列,以redis为例,需要指定redis server和list key名称。另外,在测试阶段,可以使用stdout来查看输出信息。
# 输出到redis output { if [type] == "example_nginx_access" { redis { host => "127.0.0.1" port => "6379" data_type => "list" key => "logstash:example_nginx_access" } # stdout {codec => rubydebug} } }
消息队列–>>Logstash–>>Elasticsearch
这部分是Logstash Indexer的工作,涉及input、filter和output三种插件。在input部分,我们通过redis插件将数据从消息队列中取出来。在output部分,我们通过elasticsearch插件将数据写入Elasticsearch。
# 从redis输入数据 input { redis { host => "127.0.0.1" port => "6379" data_type => "list" key => "logstash:example_nginx_access" } } output { elasticsearch { index => "logstash-example-nginx-%{+YYYY.MM}" hosts => ["127.0.0.1:9200"] } }
这里,我们重点关注filter部分,下面列举几个常用的插件,实际使用中根据自身需求从官方文档中查找适合自己业务的插件并使用即可,当然也可以编写自己的插件。
- grok,是Logstash最重要的一个插件,用于将非结构化的文本数据转化为结构化的数据。grok内部使用正则语法对文本数据进行匹配,为了降低使用复杂度,其提供了一组pattern,我们可以直接调用pattern而不需要自己写正则表达式,参考源码grok-patterns。grok解析文本的语法格式是*%{SYNTAX:SEMANTIC}*,SYNTAX是pattern名称,SEMANTIC是需要生成的字段名称,使用工具Grok Debugger可以对解析语法进行调试。例如,在下面的配置中,我们先使用grok对输入的原始nginx日志信息(默认以message作为字段名)进行解析,并添加新的字段request_path_with_verb(该字段的值是verb和request_path的组合),然后对request_path字段做进一步解析。
- kv,用于将某个字段的值进行分解,类似于编程语言中的字符串Split。在下面的配置中,我们将request_args字段值按照“&”进行分解,分解后的字段名称以“request_args_”作为前缀,并且丢弃重复的字段。
- geoip,用于根据IP信息生成地理位置信息,默认使用自带的一份GeoLiteCity database,也可以自己更换为最新的数据库,但是需要数据格式需要遵循Maxmind的格式(参考GeoLite),似乎目前只能支持legacy database,数据类型必须是.dat。下载GeoLiteCity.dat.gz后解压, 并将文件路径配置到source中即可。
- translate,用于检测某字段的值是否符合条件,如果符合条件则将其翻译成新的值,写入一个新的字段,匹配pattern可以通过YAML文件来配置。例如,在下面的配置中,我们对request_api字段翻译成更加易懂的文字描述。
filter { grok { match => {"message" => "%{IPORHOST:client_ip} "%{TIMESTAMP_ISO8601:timestamp}" "%{WORD:verb} %{NOTSPACE:request_path} HTTP/%{NUMBER:httpversion}" %{NUMBER:response_status:int} %{NUMBER:response_body_bytes:int} "%{DATA:user_agent}" "%{DATA:http_referer}" "%{NOTSPACE:http_x_forwarder_for}" "%{NUMBER:request_time:float}" "%{DATA:upstream_resopnse_time}" "%{DATA:http_cookie}" "%{DATA:http_authorization}" "%{DATA:http_token}""} add_field => {"request_path_with_verb" => "%{verb} %{request_path}"} } grok { match => {"request_path" => "%{URIPATH:request_api}(?:?%{NOTSPACE:request_args}|)"} add_field => {"request_annotation" => "%{request_api}"} } kv { prefix => "request_args_" field_split => "&" source => "request_args" allow_duplicate_values => false } geoip { source => "client_ip" database => "/home/elktest/geoip_data/GeoLiteCity.dat" } translate { field => request_path destination => request_annotation regex => true exact => true dictionary_path => "/home/elktest/api_annotation.yaml" override => true } }
Elasticsearch
Elasticsearch承载了数据存储和查询的功能,这里主要介绍实际生产中的问题和方法:
关于集群配置,重点关注三个参数:
- 第一,discovery.zen.ping.unicast.hosts,Elasticsearch默认使用Zen Discovery来做节点发现机制,推荐使用unicast来做通信方式,在该配置项中列举出Master节点。
- 第二,discovery.zen.minimum_master_nodes,该参数表示集群中可工作的具有Master节点资格的最小数量,默认值是1。为了提高集群的可用性,避免脑裂现象(所谓脑裂,就是同一个集群中的不同节点,对集群的状态有不一致的理解。),官方推荐设置为(N/2)+1,其中N是具有Master资格的节点的数量。
- 第三,discovery.zen.ping_timeout,表示节点在发现过程中的等待时间,默认值是3秒,可以根据自身网络环境进行调整,一定程度上提供可用性。
discovery.zen.ping.unicast.hosts: ["master1", "master2", "master3"] discovery.zen.minimum_master_nodes: 2 discovery.zen.ping_timeout: 10
关于集群节点,第一,节点类型包括:候选Master节点、数据节点和Client节点。通过设置两个配置项node.master和node.data为true或false,来决定将一个节点分配为什么类型的节点。第二,尽量将候选Master节点和Data节点分离开,通常Data节点负载较重,需要考虑单独部署。
关于内存,Elasticsearch默认设置的内存是1GB,对于任何一个业务部署来说,这个都太小了。通过指定ES_HEAP_SIZE环境变量,可以修改其堆内存大小,服务进程在启动时候会读取这个变量,并相应的设置堆的大小。建议设置系统内存的一半给Elasticsearch,但是不要超过32GB。参考官方文档。
关于硬盘空间,Elasticsearch默认将数据存储在/var/lib/elasticsearch路径下,随着数据的增长,一定会出现硬盘空间不够用的情形,此时就需要给机器挂载新的硬盘,并将Elasticsearch的路径配置到新硬盘的路径下。通过“path.data”配置项来进行设置,比如“path.data: /data1,/var/lib/elasticsearch,/data”。需要注意的是,同一分片下的数据只能写入到一个路径下,因此还是需要合理的规划和监控硬盘的使用。
关于Index的划分和分片的个数,这个需要根据数据量来做权衡了,Index可以按时间划分,比如每月一个或者每天一个,在Logstash输出时进行配置,shard的数量也需要做好控制。
关于监控,笔者使用过head和marvel两个监控插件,head免费,功能相对有限,marvel现在需要收费了。另外,不要在数据节点开启监控插件。
Kibana
Kibana提供的是数据查询和显示的Web服务,有丰富的图表样板,能满足大部分的数据可视化需求,这也是很多人选择ELK的主要原因之一。UI的操作没有什么特别需要介绍的,经常使用就会熟练,这里主要介绍经常遇到的三个问题。
查询语法
在Kibana的Discover页面中,可以输入一个查询条件来查询所需的数据。查询条件的写法使用的是Elasticsearch的Query String语法,而不是Query DSL,参考官方文档query-string-syntax,这里列举其中部分常用的:
- 单字段的全文检索,比如搜索args字段中包含first的文档,写作 args:first;
- 单字段的精确检索,比如搜索args字段值为first的文档,写作 args: “first”;
- 多个检索条件的组合,使用 NOT, AND 和 OR 来组合,注意必须是大写,比如 args:(“first” OR “second”) AND NOT agent: “third”;
- 字段是否存在,_exists_:agent表示要求agent字段存在,_missing_:agent表示要求agent字段不存在;
- 通配符:用 ? 表示单字母,* 表示任意个字母。
错误“Discover: Request Timeout after 30000ms”
这个错误经常发生在要查询的数据量比较大的情况下,此时Elasticsearch需要较长时间才能返回,导致Kibana发生Timeout报错。解决这个问题的方法,就是在Kibana的配置文件中修改elasticsearch.requestTimeout一项的值,然后重启Kibana服务即可,注意单位是ms。
疑惑“字符串被分解了”
经常在QQ群里看到一些人在问这样一个问题:为什么查询结果的字段值是正确的,可是做图表时却发现字段值被分解了,不是想要的结果?如下图所示的client_agent_info字段。
得到这样一个不正确结果的原因是使用了Analyzed字段来做图表分析,默认情况下Elasticsearch会对字符串数据进行分析,建立倒排索引,所以如果对这么一个字段进行terms聚合,必然会得到上面所示的错误结果了。那么应该怎么做才对?默认情况下,Elasticsearch还会创建一个相对应的没有被Analyzed的字段,即带“.raw”后缀的字段,在这样的字段上做聚合分析即可。
又会有很多人问这样的问题:为什么我的Elasticsearch没有自动创建带“.raw”后缀的字段?然而在Logstash中输出数据时,设置index名称前缀为“logstash-”就有了这个字段。这个问题的根源是Elasticsearch的dynamic template在捣鬼,dynamic temlate用于指导Elasticsearch如何为插入的数据自动建立Schema映射关系,默认情况下,Logstash会在Elasticsearch中建立一个名为“logstash”的模板,所有前缀为“logstash-”的index都会参照这个模板来建立映射关系,在该模板中申明了要为每个字符串数据建立一个额外的带“.raw”后缀的字段。可以向Elasticsearch来查询你的模板,使用API:GET http://localhost:9200/_template。
以上便是对ELK日志系统的总结介绍,还有一个重要的功能没有提到,就是如何将日志数据与自身产品业务的数据融合起来。
举个例子,在nginx日志中,通常会包含API请求访问时携带的用户Token信息,由于Token是有时效性的,我们需要及时将这些Token转换成真实的用户信息存储下来。这样的需求通常有两种实现方式,一种是自己写一个Logstash filter,然后在Logstash处理数据时调用;另一种是将Logstash Indexer产生的数据再次输出到消息队列中,由我们自己的脚本程序从消息队列中取出数据,做相应的业务处理后,输出到Elasticsearch中。目前,团队对ruby技术栈不是很熟悉,所以我们采用了第二种方案来实施。
当前,我们的数据增长相对缓慢,遇到的问题也有限,随着数据量的增加,未来一定会遇到更多的挑战,也可以进一步探索ELK。
五、微信分享追踪系统
本篇重点探讨数据采集层中的微信分享追踪系统。微信分享,早已成为移动互联网运营的主要方向之一,以Web H5页面(下面称之为微信海报)为载体,利用微信庞大的好友关系进行传播,实现宣传、拉新等营销目的。以下图为例,假设有一个海报被分享到了微信中,用户A与B首先看到了这个海报,浏览后又分享给了自己的好友,用户C看到了A分享的海报,浏览后继续分享给了自己的好友。这便形成了一个简单的传播链,其中蕴含了两种数据:
- 行为,指的是用户对微信海报的操作,比如打开、分享。
- 关系,指的是在海报传播过程中,用户之间形成的传播关系,比如用户A将海报传播给C。
这样的数据的意义在于:第一,统计分析各个渠道的海报的传播效果;第二,对传播贡献较大的用户发放微信红包奖励,提高用户的分享积极性。微信分享追踪系统,便是完成对这两种数据的采集和存储。在过去,受到公司业务和运营推广方向的影响,这部分数据驱动了近一半的推广业务。
熟悉微信开发的朋友应该知道,第一,每个微信用户在某个公众号下都拥有一个唯一的open_id,打开微信海报时,可以通过OAuth2静默授权在用户无感知的情况下拿到其open_id;第二,通过微信JS-SDK,我们可以捕捉到用户对海报页面的分享事件;第三,拿到用户在公众号下的open_id后,便可以对该用户发放微信红包了。基于这三点,我们便可以实现相关的数据追踪和分享奖励了**,本文主要是总结我们在微信分享追踪上的方案演进。
首先要说一点的是,其实微信分享追踪系统本身并不复杂,但是与复杂的产品业务结合到一起,就变得越来越复杂了。如何做到将数据逻辑与产品业务逻辑剥离开,以不变应万变,就是这里要说的方案演进了。
早期服务
早期的微信分享追踪系统,笔者曾经在浅谈微信公众号营销背后的技术一文中介绍过,其时序图如下所示。基本流程是:
- 第一,用户打开海报时,通过OAuth2授权,将open_id加入到页面链接中;
- 第二,前端上报浏览事件,需要带上open_id和传播链信息;
- 第三,用户分享时,需要在分享出去的链接中加上传播链信息,所谓传播链信息,就是每个分享过的用户的open_id组合,比如“open_id_1;open_id_2”;
- 第四,上报用户的分享事件,需要带上open_id和传播链信息。后端收到上报数据后,根据不同的功能需求,将数据保存到不同的数据表中,用于后期消费。
随着业务的发展,这个系统暴露出一些问题:
- 随着推广活动的调整,统计和奖励政策也随之变化,比如有的依据一度分享者的分享次数进行奖励,有的依据一度、二度分享者带来的浏览量进行奖励等等,还有需要根据上报的参数不同做不同的处理。所有逻辑都在上报的API请求中处理,来一个需求加一段逻辑,导致该请求的功能不断膨胀,而且一些推广活动已经下线了,相关的逻辑也没有清理掉。
- 参数比较混乱,页面URL中携带了不同的参数,包括微信相关参数、产品相关参数,前端上报时需要携带不同的参数,而前端页面太多,经常搞错。
neo4j的尝试
于是,我们思考,有没有可能在后端直接构建完整的传播信息,后期使用时直接根据条件就可以查询出所需的数据,前端上报时也不用携带传播链信息,我们想到了图形数据库存储技术。
图形数据库是一种非关系型数据库,它应用图形理论存储实体之间的关系信息。在文章开头的那张传播图中,用户的行为数据其实可以归结为用户与海报之间的关系数据,这样,这个系统其实就包含两种实体:用户、海报,三种关系:用户打开海报、用户分享海报、用户之间的传播。在诸多图形数据库中,我们决定选择比较成熟、文档相对丰富的neo4j来做DEMO。采用neo4j的查询语法,很简单的就可以查询出所需数据,简单示例一下。
# 查询1度分享者 MATCH (u:User) - [:FORWARD] -> (p:Poster) RETURN u # 查询浏览情况 MATCH (u:User) - [:OPEN] -> (p:Poster) RETURN u
下图呈现基于neo4j存储的新系统时序图,在OAuth2授权的重定向过程中,建立User和Poster节点信息,以及二者之间的OPEN关系信息,并且对页面URL计算hash值(去除无用参数信息),然后将用户open_id和URL的hash值加到页面URL中返回给前端。用户分享时,把该用户的open_id作为parent字段值,加到分享链接中,新用户打开该链接时,会根据该值来建立User与User节点之间的SPREAD关系信息。在用户分享的事件中,做一次数据上报,携带open_id和页面URL的hash值即可,后端拿到信息后,便可以建立User与Poster之间的FORWARD关系信息。如此,便可以建立完整的微信分享追踪数据了。
然而,一切并非预期的那么完美,在DEMO过程中,我们发现有两点问题不能很好的满足我们的需求:
- 无法根据时间条件快速查询信息,比如查询出昨天的一度分享者。
- 在查询用户间的关系时,会发生误判。比如在下图所示的传播关系中,UserA和UserC的传播关系是发生在海报PosterA上的,在PosterB上并没有,但是当我们尝试查询二度分享者时,会将UserA->UserC->PosterB误判为二度分享。
# 查询2度分享者 MATCH (u1:User) - [:SPREAD] -> (u2:User) - [:FORWARD] -> (p:Poster) RETURN u2, p
虽然这些问题可以想办法绕过去,比如根据时间建立不同的实体节点等等,但是这样会把数据存储做复杂化,经过权衡,我们暂时搁置了这个方案。
基于用户行为数据采集系统的方案
在第三章用户行为数据采集系统中,曾经提到早期的数据采集服务是分散在各个业务功能中的,后来我们重新构建了统一的用户行为数据采集系统。在完成这个系统后,我们开始考虑将上述的微信分享追踪系统并入其中,主要工作有:
- 数据上报的流程与早期的系统一致,但是更换原有的上报方式,采用用户行为数据采集系统的方案统一上报微信分享的数据;
- 数据接入Kafka后,一方面直接将原始数据存储到Elasticsearch,另一方面,以worker的形式来消费数据,根据相应的业务需求提取出所需的数据存入格式化数据表中,用于统计和奖励活动。当某个推广活动结束后,将其所属的worker停掉即可。
通过这样的改进,我们暂时解决了前端上报混乱和后端业务逻辑膨胀的问题,将数据上报和业务需求隔离开。数据方面,实时数据流在Kafka中,历史数据也在Elasticsearch中有存储;业务需求方面,来了一个新的需求后,我们只需添加一个新的worker来实现消费逻辑,活动结束后停掉worker。
六、数据仓库的建设
本章将重点探讨数据处理层中数据仓库的建设。在第二章:运营数据系统,有提到早期的数据服务中存在不少问题,虽然在做运营Dashboard系统时,对后台数据服务进行了梳理,构建了数据处理的底层公共库等,但是仍然存在一些问题:
- 中间数据流失,计算结果没有共享。比如在很多数据报告中都会对同一个功能进行数据提取、分析,但是都是各自处理一遍,没有对结果进行共享。
- 数据分散在多个数据源,如MySQL、MongoDB、Elasticsearch,很难对多个源的数据进行联合使用、有效组织。
- 每个人都需要非常清楚产品业务逻辑才能正确地提取、处理数据,导致大家都将大量时间耗费在基础数据处理中。
于是,我们考虑建设一个适于分析的数据存储系统,该系统的工作应该包含两部分:
- 第一,根据需求抽象出数据模型;
- 第二,按照数据模型的定义,从各个数据源抽取数据,进行清洗、处理后存储下来。
虽然数据仓库的学术定义有很多版本,而且我们的系统也没有涉及到多部门的数据整合,但是符合上述两个特点的,应该可以归结到数据仓库的范畴了,所以请允许笔者将本文命名为“数据仓库的建设”。
下图所示,为现阶段我们的数据仓库建设方案。
- 数据源。数据主要来源于MySQL和MongoDB中的业务数据、Elasticsearch中的用户行为数据与日志数据;
- ETL。ETL过程通过编写Python脚本来完成,由Airflow负责任务流的管理;
- 数据仓库。建立适于分析的多维数据模型,将形成的数据存入MySQL中,供数据应用层使用。
可以看到,数据仓库本身既不生产数据也不消费数据,只是作为一个中间平台集中存储数据,整个系统实现的重点在于数据建模与ETL过程,这也是日常维护中的重点。
存储选型
将数据落地到哪里是首先要考虑的问题,笔者考虑的因素主要有这么几点:一是数据量大小和增长速度,二是要能实现SQL或者类SQL操作,有多表联合、聚合分析功能,三是团队技术栈。可选的技术方案有MySQL、Oracle和Hive,最终选择了基于MYISAM存储引擎的MySQL,部分原因如下:
要不要Hadoop?
生产业务数据库与用户行为数据增长均比较缓慢,预计在接下来的一年里数据仓库的总存储量不会超过500GB。因此现阶段接入Hadoop的意义不大,强行接入反而会降低工作效率。而且团队主要技术栈是Python,使用Python操作Hadoop本身就会有性能损耗。
为什么是MySQL?
相比Oracle,团队对MySQL更加熟悉,所以笔者更多的考虑是选择MySQL的哪个存储引擎:Infobright vs. myisam vs. innodb。
Infobright引入了列存储方案,高强度的数据压缩,优化的统计计算,但是目前已经没有社区版了,需要收费。
抛开底层存储的区别,myisam与innodb在特性上的区别主要体现在三个方面:
- 第一,引用的一致性,innodb有外键,在一对多关系的表之间形成物理约束,而myisam没有;
- 第二,事务,innodb有事务操作,可以保证一组操作的原子性,而myisam没有;
- 第三,锁级别,innodb支持行锁,而myisam只支持表锁。
对于外键与事务,并不是数据仓库需要的,而且数据仓库是读多写少的,myisam的查询性能优于innodb,因此myisam成为首选。
数据建模
根据数据分析的需求抽象出合适的数据模型,是数据仓库建设的一个重要环节。所谓数据模型,就是抽象出来的一组实体以及实体之间的关系,而数据建模,便是为了表达实际的业务特性与关系所进行的抽象。
数据建模是一个很宽泛的话题,有很多方法论值得研究,具体到业务上不同行业又会有不同的建模手法。这里主要结合我们的实践来简单地谈一些认识和方法。
数据建模方法
目前业界有很多数据建模的方法,比如范式建模法、维度建模法等等。
- 遵循三范式,我们在做业务数据库设计时经常会用到,这种方法对业务功能进行抽象,方便功能扩展,但是会额外增加分析的复杂度,因此笔者更倾向于维度建模法。
- 维度建模法,是Kimball最先提出的概念,将数据抽象为事实表与维度表两种,而根据二者之间的关系将整体的模型划分为星型模型与雪花模型两种。这种建模方法的优势在于,根据各个维度对数据进行了预处理,比如按照时间维度进行预先的统计、分类等等,可以提高数据分析应用时的效率,是适于分析的一种方法。
维度建模法相关概念
具体来看看几个概念:
- 维度表与事实表。维度表,描述的是事物的属性,反映了观察事物的角度。事实表,描述的是业务过程的事实数据,是要关注的具体内容,每行数据对应一个或多个度量事件。比如,分析“某地区某商品某季度的销量”,就是从地区、商品、时间(季度)三个角度来观察商品的销量,维度表有地区表、商品表和时间表,事实表为销量表。在销量表中,通过键值关联到三个维度表中,通过度量值来表示对应的销量,因此事实表通常有两种字段:键值列、度量值列。
- 星型模型与雪花模型。两种模型表达的是事实表与维度表之间的关系。当所有需要的维度表都直接关联到事实表时,看上去就是一颗星星,称之为星型模型;当有一个或多个维表没有直接关联到到事实表上,而是通过其他维度表连接到事实表上时,看上去就是一颗雪花,称之为雪花模型。二者的区别在于,雪花模型一定程度上降低了信息冗余度,但是合适的冗余信息能有效的帮助我们提高查询效率,因此,笔者更倾向于星型模型。
基本的维度建模思路
维度建模的基本思路可以归纳为这么几点:
- 第一,确定主题,即搞清楚要分析的主题是什么,比如上述的“某地区某商品某季度的销量”;
- 第二,确定分析的维度,准备从哪几个角度来分析数据;
- 第三,确定事实表中每行的数据粒度,比如时间粒度细化到季度就可以了;
- 第四,确定分析的度量事件,即数据指标是什么。
举个例子,业务场景是:一款做连锁企业招聘工作的产品,比如为麦当劳的所有连锁门店招聘员工,现在要分析“每家门店的招聘情况如何?”。
结合具体业务,我们引入
- 六个维度:时间维度、地区维度、品牌维度、门店维度、职位维度、申请渠道;
- 数据指标上,主要有申请工作人数、申请工作次数、聘用人数、拒绝人数,每个指标分别有增量值和总量值两种;
- 数据粒度上,时间维度细分到以小时为单位,地区维度细分到市一级。
下图所示便是相应的星型模型,有三点值得一提:
- 可以看到我们只建立了四张维度表,地区维度和渠道维度是直接以字符串的形式放到事实表中的。这是维度设计中经常遇到的一个问题:如果这个维度只有一个属性,那么是作为单独的一张表还是作为事实表的一部分?其实并没有完全对与错的答案,只有是否适合自己的答案。这里,城市与渠道的信息并不会发生变化,所以放入事实表中可以避免联合查询。
- 建立了统一的时间维度,可以支持各种时间统计方案,避免在查询时进行时间值运算。
- 在品牌维度、门店维度、职位维度三张表中,都有prod_xxxx_id的字段,其值是产品业务数据库中相应数据的id,作用是为了与业务数据库中的信息进行同步。当业务数据库中的相关信息发生变化时,会通过ETL来更新数据仓库中的信息,因此我们需要这样的一个字段来进行唯一标识。
ETL
ETL这块,由于前期我们做了不少工作来构建底层数据分析公共库,能有效的帮助我们进行数据抽取与处理。因此,现阶段还没有引入诸如Kettle这样的开源工具,主要采用编写Python脚本来实现。这里主要谈谈增量更新机制与任务流管理两个问题的策略。
增量更新机制
增量更新的背景是这样的:第一,上面有提到,对于可变的维度表,我们添加了prod_xxxx_id字段来唯一标识,实现信息覆盖更新。对于事实表,为了反映历史状态,表中的数据通常是不可逆的,只有插入操作,没有删除或者修改操作,表示在过去一段时间内完成的事实业务数据,更新的方法就是插入新的数据。第二,ETL通常是近实时的,需要依赖schedule触发更新,因此每次需要更新的信息就是上一次更新时间与当前时间之间的变化数据。笔者采用的策略是:
- 建立一张temp表,表中有last_update_time与etl_name两个字段;
- 每次更新时,首先查询出相应的etl_name的最近一条记录,取其中的last_update_time作为起始时间,取当前时间为结束时间;
- 抽取数据源中在这段时间内变化的数据,作为ETL过程的输入,进行处理;
- 更新成功时,插入一条数据,last_update_time为当前时间。
Airflow任务流管理系统
在早期数据服务中,我们主要依靠crontab来运行各个任务,随着业务增多,任务的管理变得越来越吃力,体现在以下几方面:
- 查看任务的执行时间和进展不方便。每次需要查看某个任务的执行情况时,都要登录到服务器上去查看命令行的执行时间、log在哪里,通过ps来查看当前进程是否在运行等等。
- 任务跑失败后,没有通知与重试。
- 任务之间的依赖关系无法保证,完全靠预估,然后在crontab里设定执行时间间隔,经常出现上游还没有处理完,下游就启动了,导致脏数据的产生。
于是,我们开始考虑引入一个任务流管理系统,基本想法是:第一,要能解决上述的问题;第二,最好能与Python友好的兼容,毕竟团队的主要技术栈是Python。
经过调研,发现Airflow是当前最适合我们的。Airflow是Airbnb公司开源的一款工作流管理系统,基于Python编写,兼容crontab的schedule设置方法,可以很简单的描述任务之间的逻辑与依赖,并且提供了可视化的WebUI用于任务管理与查看,任务失败时可以设置重试与邮件通知。这里贴一张官方的截图来一睹其风采。
Airflow有三个重要的概念:DAG、Task和Operator。DAG(directed acyclic graphs),有向无环图,用来表示任务的依赖结构;Task表示一个具体的任务节点;Operator表示某个Task的执行体是什么,比如BashOperator是执行一个Bash脚本,PythonOperator是执行一段python代码等等。
使用Airflow,首先要编写对应的任务脚本,通常脚本需要做三件事:第一,描述DAG的属性(比如schedule、重试策略等),第二,描述Task属性(比如Operator是什么),第三,描述Task的依赖情况。进一步的认识可以参考官方文档。
以上便是现阶段我们的数据仓库发展与建设方法,虽然比较简单,但是目前基本能满足需求。随着数据规模的增长和业务的复杂化,未来还有很多路要走:如何合理的建模?如何有效的利用数据?如何提高数据分析效率?期待更多的挑战!
七、附录
文章来源:
Bruce
https://bruce.blog.csdn.net/
评论留言