唯's Blog

笔者是一个热爱编程的 Java 程序员。

0%

用户每次在首页推荐获取10个帖子,下拉刷新,每次刷新产生的帖子记录成日志,日志内容包含10个帖子的id,日志的信息会发送到 MQ ,用于消费。

需求:

  • 给用户展示最近 24 小时曝光 top 100 的帖子,注意24小时是一个滑动窗口:每次读取都是最近24小时的曝光统计结果(不用精确到秒,最大延迟不应该超过10分钟)

  • 线上用户读取结果时,接口时延 P99 应该控制在 200ms 以下(忽略高并发问题,假设 qps=1)

  • 数量级:4000 万日志,4 亿条曝光,100万帖子

定义服务

  • 帖子推荐服务

存储

  • 结合数据量大、但是对 QPS 要求不高,接口响应时间要求高。

方案

方案1

  • 消费MQ,按10分钟维度去建立缓存,例如:id_202211251620 ,value就上一个窗口减去24小时前的窗口值,窗口期累积,并比较判断是否能计入top100 的zset,用户取zset就行

方案2

  • 经典 Flink 计算 topn 问题

方案3

  • 两个流 一个曝光流 一个过期流 一个加1 一个减1

架构设计原则

原则是指导思想,就像听党指挥能打胜仗一样,需要熟记于心,做方案评估时挨个对照,看是否满足,以下基本原则很是认可。

  • 合适优于领先

  • 演进优于快成

  • 简单优于复杂

Cola 是什么?

Cola是阿里开源框架,可以说是DDD的实现。

image.png

image.png

测试数据

EDI
  • EDI-14248

  • EDI-14249

  • EDI-14288

  • EDI-14289

  • EDI-14290

  • EDI-14293

  • EDI-14294

PE
  • SAODEV-1425

  • SAODEV-1435

  • SAODEV-1436

  • SAODEV-1440

  • SAODEV-1441

  • SAODEV-1442

  • SAODEV-1443

  • SAODEV-1457

Req

start requirement
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

{

"fields": {

"customfield_14803": {

"name": "zw35"

},

"timetracking": {

"originalEstimate": "20h"

}

},

"transition": {

"id": "721"

}

}

  • live bug
1
2
3
4
5
6
7
8
9
10
11
12
13



{

"transition": {

"id": "721"

}

}

  • Q: is key project 这个字段目前是否还需要? 影响功能实现具体的设计
Complete Requirement undone
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

{

"transition": {

"id": "731"

},

"fields": {

"customfield_14316": "2022-08-05",

"customfield_13402": "2022-08-05"

}

}

  • error msg:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

{

"errorMessages": [],

"errors": {

"customfield_14316": "Plan Launch Date is required.",

"customfield_13402": "Plan Release Date is required."

}

}

Start Develop
1
2
3
4
5
6
7
8
9
10
11

{

"transition": {

"id": "741"

}

}

Complete Develop undone
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

{

"transition": {

"id": "751"

},

"fields":{

"customfield_14620": "zw35",

"fixVersions": "123",

"customfield_14621": "jd53"

}



}

  • error msg:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

{

"errorMessages": [],

"errors": {

"customfield_14620": "Developers is required.",

"fixVersions": "Fix Version/s is required.",

"customfield_14621": "Testers is required."

}

}

Release
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

{

"transition": {

"id": "761"

},

"fields": {

"customfield_13404":"2022-08-02 14:27:00"

}

}



Start UAT
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15



{

"transition": {

"id": "771"

}

}



Complete UAT
1
2
3
4
5
6
7
8
9
10
11

{

"transition": {

"id": "781"

}

}

Approve to Launch
1
2
3
4
5
6
7
8
9
10
11

{

"transition": {

"id": "891"

}

}

Launch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

{

"transition": {

"id": "791"

},

"fields": {

"customfield_14319": "2022-08-02 14:27:00"

}

}

Confirm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

{

"transition": {

"id": "881"

},

"fields": {

"customfield_14924": "2022-08-02 14:27:00"

}

}

非 SOX 的

Start PRE Testing
1
2
3
4
5
6
7
8
9
10
11

{

"transition": {

"id": "771"

}

}

Complete PRE Testing
1
2
3
4
5
6
7
8
9
10
11

{

"transition": {

"id": "781"

}

}

Approve to Launch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

{

"transition": {

"id": "791"

},

"fields":{

"customfield_14319":"2022-08-02"

}

}

Cancel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

{

"transition": {

"id": "711"

},



"fields": {

"customfield_14700": {



"id":"14400",



"value": "zw35"

}

},

"update": {

"comment": [

{

"add": {



"body": "Bug has been fixed."



}

}

]

}



}



问题

  • Q: is key project 这个字段目前是否还需要? 影响功能实现具体的设计

A: 该字段已取消

  • Save:触发变成 UAT , Accept:触发变成 Ready For Launch? Save 操作会在 Staging 前面的任意环节,此时的Jira 状态不固定的,更新 UAT 状态肯定会失败。

A: Staging validation 触发 UAT

  • 非 SOX 项目允许跳过某些状态,例如跳过 Staging , Released 状态哪个操作去更新?

A: ignore 操作时 去更新 Jira 状态(Ignore Staging 环节,更新 Staging 环节的Jira状态)

  • EM 的情况?跳过 Approve ,状态怎么更新?

A: EM 项目统一在 Launch 以后更新状态

  • Edit Release / 添加 addition 的情况,jira 状态需要回滚么?

A: 不需要,同时 Jira 也不支持

Log-structured 在分布式系统、数据系统都有 Log-Stuctured 结构的应用,大家所熟知的 MongoDB 和 HBase 这类的 NoSQL 数据库,它们的底层存储数据结构其实也是 Log-Structured 结构

解决什么问题

为了解决并发写操作,在高并发下同时更新某个值,需要加锁保证数据一致性,操作安全,那有没有方法可以不用顾及写操作的高并发问题,同时也可以最终获得一个准确的结果呢?答案就是使用 Log-Structured 结构。

Log-structured

Log-Structured结构,有时候也会被称作是 Append-only Sequence of Data ,因为所有的写操作都会不停地添加进这个数据结构中,而不会更新原来已有的值,这也是 Log-Structured 结构的一大特性。

存在的问题

  • 无限制的写入,数组在内存中无限的增长,如何处理?

  • 每次获取结果,都需要遍历一遍数据,时间复杂度非常高,那该怎么优化?

  • 平衡树如何被应用在这里面的

Log-Structurd 结构的优化

  • 拆分成多个 Segment ,Segment 大小固定,Segment 1 写满之后,新的数据将写入新的 Segment 2 中

  • 利用 Compaction 后台线程,将不同的 Segments 合并在一起

  • Compaction 合并完成之后,合并的结果放在了新的 Segment 中,合并前的Segment 便删除

SSTable 和 LSM 树

  • SSTable(Sorted String Table)数据结构是在 Log-Structured 结构的基础上,多加了一条规则,就是所有保存在 Log-Structured 结构里的数据都是键值对,并且键必须是字符串,在经过了 Compaction 操作之后,所有的 Compacted Segment 里保存的键值对都必须按照字符排序。

LSM树

LSM树(Log Structured Merge Tree,结构化合并树)的思想非常朴素,就是将对数据的修改增量保持在内存中,达到指定的大小限制后将这些修改操作批量写入磁盘(由此提升了写性能),是一种基于硬盘的数据结构,与B-tree相比,能显著地减少硬盘磁盘臂的开销。当然凡事有利有弊,LSM树和B+树相比,LSM树牺牲了部分读性能,用来大幅提高写性能。

应用

  • 采用 Lucence 作为后台索引引擎的开源搜索框架,像 ElasticSearch 和 Solr,底层用的就是 LSM 树

  • 有一种特殊情况:所搜索的数据不在保存的数据中,而想要判断数据是否存在该 Segment 中就需要遍历整个 Segment,针对这种情况,搜索引擎底层还加一个 Bloom Filter 这个数据结构。

WAL(Write Ahead Log) 预写日志

  • 本地方法栈

  • 虚拟机栈

  • 程序计数器

  • 方法区(元空间)

容器化技术解决了什么问题?

进程之间环境、资源隔离

  • 沙箱机制,容器虚拟自己的系统

服务器资源合理分配利用

  • 定义服务器中多个服务,某个服务所占用的资源可以选择扩容缩容

符合服务现代敏捷思想,devops 快速迭代

  • 一是让开发环境尽量贴近生产环境,二是快速搭建开发环境。现在Docker可以轻易的让几十个服务在Docker中跑起来,从而大大提升开发效率

服务运行环境保持一致

  • 用的是同一个镜像,容器中环境一致

扩展、迁移简单

*

容器化的底层原理

  • 依赖 Linux Container 技术 ,核心两个组件 Namespace、Cgroups

Namespace

  • 提供资源隔离,定义全新运行环境的能力。linux kernel clone 方法,容器本质还是一个进程

Cgroups

  • 提供限制资源使用上限的能力。例如:CPU、内存、磁盘等

容器化时代:

1、虚拟化解决的问题:是物理层面的隔离

2、容器化解决的问题:是应用层面的隔离

OAuth2 (Open Authorization,开放授权),第三方服务无需知道用户的账号及密码,就可获取到用户的授权信息(Token)

例如:

微信开发平台接入

image.png

Synchronized

1. 概念:

  • 互斥锁,Java内置锁(隐式锁) 非公平 不可中断

2. 应用:

  • 作用方法:同步方法(this)

  • 作用代码块:同步代码块(object)

  • 作用静态方法:同步方法(锁的类对象)

3. 原理:

  • 锁的粒度是对象

  • 锁膨胀

  • 无锁->偏向锁->轻量锁->重量级锁
  • 锁升级不可逆
  • 锁的标志 记录再 对象 markword
  • 锁消除
  • 逃逸分析: -XX:Do..
  • 标量替换
  • 对象可以再栈上分配
  • 锁粗化
  • 避免频繁枷锁
  • 自适应自旋锁

  • 偏向锁

  • 轻量级锁

  • 锁升级

数据库连接池的模块设计

DriverManager 与 DataSource 的区别
  • DriverManger 使用 getConnection 获取数据库的连接,本质是与数据库建立连接

    • 建立数据库连接操作是很耗费资源的操作,产生较大的系统开销
  • 因此有了连接池的引用,使用池化思想:

    • 提前创建好一些连接,供使用
    • 将创建的资源缓存起来,避免频繁的创建连接,等待下一个使用者使用
  • DataSource 的引入

    • 本质是提供数据库连接的服务类(数据源)
    • 对DriverManger进行封装,建立一个中间层(DateSource),将DriverManger创建的连接存入Pool,外部从Pool获取连接。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23





应用程序 -> DriverManager



--------------------------------------

--------------------------------------



DriverManager



应用程序 -> DataSource -> Pool

> * 用了这个中间层(DataSource)我们可以提供某些服务(连接池、分布式事务)

  • DataSource的形式是JNDI (Java Naming Directory Interface)
  • 数据源的概念在应用程序与数据库连接之间插入了一个中间层,进而可以实现连接池以及事务管理,并且以JNDI的形式,也能够以非常方便的形式使用。
小结
  • DataSource 是基于应用程序与 DriverManager 抽象出来的中间层,解耦了应用程序与数据库的实际连接,类似代理模式可以添加更多的功能:连接池、分布式事务等。

    连接池与DataSource是两回事,只是目前市面上实现了DataSource都赋予了连接池、连接池管理的功能。

HikariPool

设计模型
核心类 HikariDataSource 继承自 DataSource
核心成员 HikariPool pool(volatile 保证可见)实现连接池功能 继承自 PoolBase
  • 字段:

  • DataSource 用于创建数据库连接(DriverDataSource Driver创建连接)

  • connectionBag 用于缓存数据库连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

/**

设计亮点

快,减少并发 无锁并发 CAS

线程回收时,优先放在之前使用线程 ThreadLocal 方便下次使用 时间局部性原理。

ThreadLocal<List<Connection>>

缓存线程使用 copyOnWriterList 写时复制,写与读同时操作时,不阻塞读

更新 PoolEntry 状态 使用 CAS

*/

  • addConnectionExecutor 单线程线程池 用于建立数据库连接 -> 线程池允许核心线程超时 executor.allowCoreThreadTimeOut(true);

  • closeConnectionExecutor 单线程线程池 用于关闭数据库连接 -> 线程池允许核心线程超时 executor.allowCoreThreadTimeOut(true);

  • houseKeepingExecutorService 调度线程池 内部定时器,用于实现连接的超时淘汰、连接池的补充等工作。

  • addConnectionQueueReadOnlyView -> addConnectionExecutor(线程池) 阻塞队列的集合 用来判断高并发时 超过MaxSize

1
2
3
4
5
6
7
8
9

final int maxPoolSize = config.getMaximumPoolSize();

LinkedBlockingQueue<Runnable> addConnectionQueue = new LinkedBlockingQueue<>(maxPoolSize);

this.addConnectionQueueReadOnlyView = unmodifiableCollection(addConnectionQueue);

this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy());

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77



* PoolEntryCreator,添加连接的任务,实现创建连接的具体逻辑。



#### Pool entry

POOL_ENTRY 作为一个连接节点

通过提交一个 houseKeepingExecutorService 调度任务,来保证超过最大时间的空闲任务close

#### ConnectionBag

* sharedList CopyOnWriteArrayList(读多写少) 缓存数据库连接(pool entry 保证类(ProxyConnection)

* waiters 待获取连接数量

* threadList 缓存当前线程数据库连接(针对同一 个线程返回获取连接,提升效率ThreadLocal)

* handoffQueue 提升效率,并发频繁的场景,避免遍历 copyOnWirte 拿取缓存数据库连接



##### HikariPool getConnection 返回的是装饰/代理的Connection ,Poolentry中存放的是原生的数据库连接 ,



##### 使用 Javassist (字节码变成) 创建 ProxyFactory 用于创建 HikariProxyConnection (代理模式) 来装饰原生 Connection ,调用 Close 方法时不会真实的关闭close(连接池缓存连接)



###### Javassist 利用 Maven 生命周期 compile 时生成新的class文件



​```XML

<plugin>

<!-- Generate proxies -->

<groupId>org.codehaus.mojo</groupId>

<artifactId>exec-maven-plugin</artifactId>

<version>1.6.0</version>

<executions>

<execution>

<phase>compile</phase>

<!-- phase>generate-test-sources</phase -->

<goals>

<goal>java</goal>

</goals>

</execution>

</executions>

<configuration>

<mainClass>com.zaxxer.hikari.util.JavassistProxyFactory</mainClass>

</configuration>

</plugin>



配置参数
  • maxLifeTime: 15m

    底层执行一个延时任务,延时 maxLifeTime 便执行,给 Connection 打上驱逐标记 poolEntry.markEvicted(),如果当前连接是空闲连接 state; STATE_NOT_IN_USE,则直接真实关闭当前连接。若不是 只有等待连接空闲后,在getConnection 时,判断是否存在evict 标记,进行连接close

  • idleTimeout: 10s

  • minIdle: 1

    底层使用一个定时线程池,执行一个定时任务默认 30ms 执行一次,扫描出空闲连接,并进行清理。(保证剩余的线程)

tip 通过 Semaphore 信号量并发工具 实现线程池的暂停功能
  • 核心思想:将信号量消耗完,获取信号量的最大值,,其余获取消耗量的线程都阻塞
1
2
3
4
5
6
7
8
9

public void suspend()

{

acquisitionSemaphore.acquireUninterruptibly(MAX_PERMITS);

}

tip (PoolEntry 状态字段state)
  • AtomicIntegerFieldUpdater 用于原子性(线程安全)更新类对象中的int字段

JDBCTemplate

1
2
3
4
5

JDBCTemplate (DataSource) -> DataSourceUtil.getConnection(DataSource)

封装事务

tip Collection.unmodifiableCollection()

  • 创建/封装 不可变集合 UnmodifiableCollection ,代理模式,重写修改的方法 抛出异常
1
2
3
4
5
6
7
8
9
10
11
12
13

public boolean add(E e) {

throw new UnsupportedOperationException();

}

public boolean remove(Object o) {

throw new UnsupportedOperationException();

}

在分布式节点处理数据,例如多主复制。存在冲突问题

解决思路

1. 避免冲突

所谓解决问题最根本的方式则是尽可能不让它发生,如果能够在应用层保证对特定数据的请求只发生在一个节点上,这样就没有所谓的“写冲突”了。继续拿上面的协同编辑文档举例,如果我们把每个人的都在填有自己姓名表格的一行里面进行编辑,这样就可以最大程度地保证每个人的修改范围不会有重叠,冲突也就迎刃而解了。

2. 收敛于一致状态

然而,对更新标题这种情况而言,冲突是没法避免的,但还是需要有方法解决。对于单主节点模式而言,如果同一个字段有多次写入,那么最后写入的一定是最新的。ZK、KafkaController、KafkaReplica都有类似Epoch的方式去屏蔽过期的写操作,由于所有的写请求都经过同一个节点,顺序是绝对的,但对于多主节点而言,由于没有绝对顺序的保证,就只能试图用一些方式来决策相对顺序,使冲突最终收敛,这里提到了几种方法:

给每个写请求分配Uniq-ID,例如一个时间戳,一个随机数,一个UUID或Hash值,最终取最高的ID作为最新的写入。如果基于时间戳,则称作最后写入者获胜(LWW),这种方式看上去非常直接且简单,并且非常流行。但很遗憾,文章一开始也提到了,分布式系统没有办法在机器间共享一套统一的系统时间,所以这个方案很有可能因为这个问题导致数据丢失(时钟漂移)。

每个副本分配一个唯一的ID,ID高的更新优先级高于地域低的,这显然也会丢失数据。

当然,我们可以用某种方式做拼接,或利用预先定义的格式保留冲突相关信息,然后由用户自行解决。

3. 用户自行处理

其实,把这个操作直接交给用户,让用户自己在读取或写入前进行冲突解决,这种例子也是屡见不鲜,Github采用就是这种方式。