Storm的进程、线程、任务数和线程安全问题研究(一篇明白)
基于Storm
1.0.6
版本
本文查阅了许多资料并反复琢磨,应该能做到一篇明白Storm的worker进程、线程、任务关系
一、概念
-
进程:即worker进程,工作进程,Java进程,或称JVM进程
-
线程:即executor,由进程产生,用于执行任务
-
任务:即task,任务就是被线程执行的东西,数据会流过task,线程作用于task将数据处理,可以理解task就是数据的处理工具,或者是数据的处理逻辑的工具
-
进程数:即worker数
-
线程数:即executor数
-
任务数:即task数
-
component:
spout
和bolt
的统称,后续也会用bolt的prepare/execute方法统称spout的open/nextTuple方法
worker进程可以比喻为工厂
,executor线程可以比喻为工人
,task任务可以比喻为车床
工厂里有很多工人,每个工人操作一到多台车床,车床有原料流过,原料从车床处理完毕流到下一个车床。
进程、线程是比较好理解的,任务(task)就比较难理解,什么是task,task应该是数据的处理逻辑的集合,线程只负责执行,至于怎么执行,是task规定的,而task怎么知道如何处理数据,它应该是委托给Bolt/Spount的,这就是我们书写的Bolt/Spount里头的处理逻辑,而task其实就是包装了bolt/spout实例,线程执行了task的run方法,run方法里转而调用execute/nextTuple方法
三者关系
以下知识点细节都是正确的 (目前的知识认为是对的)
- bolt/spout 实例数一定等于task数
- 每个task都有自己的 bolt 或 spout 的实例,所以 task数等于 bolt/spout 实例数
- 线程数 <= task数,一个线程执行一到多个task
- component 中的成员变量在 prepare/execute(open/nextTuple) 方法中被操作,是线程安全的
- component 中的静态变量在 prepare/execute(open/nextTuple) 方法中被操作,是非线程安全的
- 相同 worker 可能存在类型相同的多个实例 (相同的Class,有多个实例)
- 可能出现一个线程服务于多个task的情况
- 一个executor总会有一个线程来运行executor所有的task,这说明task在executor内部是串行执行的。(An executor always has one thread that it uses for all of its tasks, which means that tasks run serially on an executor.)
- 一个线程服务于多个task,而不是多个线程操作同一个task,因此component的成员变量不存在资源竞争
- 因为component中的静态变量是类共享的,而同一个worker里存在相同类型component的多个实例,因此是可能出现多个线程操作同一个静态变量,造成线程安全问题
- 一个storm集群可以运行多个拓扑(Topology),资源是隔离的
- 一个拓扑可能由多个worker为其服务
- 具体某个worker服务于特定的拓扑
- 线程(executor)是进程(worker)产生的
- 线程(executor)服务于固定的某个类的component(不会出现跨类型)
- task数 是 component 的实例数, 证据是 StormUI 界面可看到解释:A task is an instance of a Bolt or Spout
- 设置线程数的地方,setBolt/setSpout的parallelism_hint参数
- 设置task数的地方,setNumTasks,不设置则等于线程数
TopologyBuilder#setBolt
源码中对parallelism_hint
的解释是错的
详解:
Note that as of Storm 0.8 the parallelism_hint parameter now specifies the initial number of executors (not tasks!) for that bolt.
翻译:Storm 0.8版本之后,使用 parallelism_hint参数来指定该bolt的executor的初始数量,注意不是task数!
之所以源码中的注释是错的,是因为注释没及时修改过来
源码:
/**
* Define a new bolt in this topology with the specified amount of parallelism.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the bolt
* @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster.
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) throws IllegalArgumentException {}
补充:
有博客说XxxxBolt的实例数会受到 不大于workerNum 规则的影响,是错的,动手尝试了setWorkerNum(1)并且设置bolt数量是2,结果StormUI上依旧显示taskNum为2,显示bolt的数量是2,所以观点站不住脚
配置拓扑的并发度
如何制定task数、线程数和进程数
注意Storm术语中“并发度(parallelism)“特定地用来描述所谓的”parallelism hint”,是一个意思
Storm目前配置项的优先级是:外部component特定的配置>内部component特定的配置项>拓扑特定的配置项>storm.yaml>defaults.yaml
一、非运行时方式指定
用途 | 描述 | 配置文件方式进行设置 | 代码方式进行设置 |
---|---|---|---|
worker进程数量 | 拓扑在集群机器上运行时需要的worker进程数据量 | Config#TOPOLOGY_WORKERS | Config#setNumWorkers |
每个组件需要创建的executor数量 | executor线程的数量 | 没有单独配置项 | TopologyBuilder#setSpout() 和 TopologyBuidler#setBolt() Storm 0.8之后使用 parallelism_hint参数来指定executor的初始数量(不是task数,源码中的描述可能由于未及时更新,是错的,详细参考文章末尾的参考资料 ) |
task数量 | 每个组件需要创建的task数量 | Config#TOPOLOGY_TASKS | ComponentConfigurationDeclarer#setNumTasks() |
二、运行时方式指定
1、使用命令行
即拓扑已经在storm平台跑起来了, 如何动态改
- task数,改不了
- executor数,使用
-e
- worker数,使用
-n
例子:
# 重新配置“mytopology”拓扑使用5个worker进程[原来是2个]
# "blue-spout"这个spout使用3个[原来有2个]executor
# "yellow-bolt"使用10个[原来有6个]executor
$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
2、使用 StormUI
(略)
参考资料
- 官方1.0.6版本文档 (本文的storm基于
1.0.1
版本,因官方没这版本的文档,查看1.0.6
) - Understanding the Parallelism of a Storm Topology (这里是中文版) 非常推荐
- Thread safe of storm bolt (stack overflow)
- Thread safety of bolts 似乎是谷歌的论坛,回复问题的人Nathan正是Storm作者!!!
- 应该是官方文档的翻译 非常全面的参考资料
- Storm消息可靠性保证 如何保证消息的可靠性,有代码例子
- How is this word count bolt thread safe?
- 源码中的一些注释
// setBolt带上的并行度的解释
/**
* Define a new bolt in this topology with the specified amount of parallelism.
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the bolt
* @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster.
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) throws IllegalArgumentException {}
// parallelism_hint解释:分配用于执行该bolt的任务数,感觉有点奇怪,怎么是"任务数",不是线程数吗?为什么用了task这个单词
// setNumTasks,代码如下解释
/**
* How many instances to create for a spout/bolt. A task runs on a thread with zero or more
* other tasks for the same spout/bolt. The number of tasks for a spout/bolt is always
* the same throughout the lifetime of a topology, but the number of executors (threads) for
* a spout/bolt can change over time. This allows a topology to scale to more or less resources
* without redeploying the topology or violating the constraints of Storm (such as a fields grouping
* guaranteeing that the same value goes to the same task).
*/
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/135331.html