【Flink图计算源码解析】:Flink图的存储数据结构_hxcaifly的博客-CSDN博客_flink图计算


本站和网页 https://blog.csdn.net/hxcaifly/article/details/86256170 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

【Flink图计算源码解析】:Flink图的存储数据结构_hxcaifly的博客-CSDN博客_flink图计算
【Flink图计算源码解析】:Flink图的存储数据结构
hxcaifly
于 2019-01-10 22:41:17 发布
2006
收藏
分类专栏:
Flink
Flink原理和应用
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/hxcaifly/article/details/86256170
版权
Flink
同时被 2 个专栏收录
31 篇文章
8 订阅
订阅专栏
Flink原理和应用
29 篇文章
16 订阅
订阅专栏
承接上篇:https://blog.csdn.net/hxcaifly/article/details/86147736
1.图的基本组成
图由边和顶点构成:
Edge: 边。每条边是Tuple3<K,K,V>的数据结构,保存了边的开始顶点Id,边的目的顶点Id和边的值。Vertex: 顶点。每个顶点是Tuple2<K, V>的数据结构。保存了顶点的Id,和顶点的值。
2. 顶点和边的代码定义
2.1. 顶点的定义
package org.apache.flink.graph;
import org.apache.flink.api.java.tuple.Tuple2;
/**
* 图的顶点。由ID和值构成
* 对于没有值的顶点,利用 {@link org.apache.flink.types.NullValue} 作为值类型。
* @param <K>
* @param <V>
*/
public class Vertex<K, V> extends Tuple2<K, V> {
private static final long serialVersionUID = 1L;
public Vertex(){}
public Vertex(K k, V val) {
this.f0 = k;
this.f1 = val;
public K getId() {
return this.f0;
public V getValue() {
return this.f1;
public void setId(K id) {
this.f0 = id;
public void setValue(V val) {
this.f1 = val;
顶点的定义其实很简洁,顶点就是由顶点Id和顶点值构成。所以就利用了Tuple2<K, V> 数据结构。其中K表示顶点Id的类型;V表示顶点的值类型。
2.2. 边的定义
package org.apache.flink.graph;
import org.apache.flink.api.java.tuple.Tuple3;
/**
* 边是呈现两个顶点{@link Vertex vertices}之间的连线
* 对于边如果没有值,用{@link org.apache.flink.types.NullValue}作为值类型。
* @param <K> 源顶点和目的顶点的key类型
* @param <V> 边的值类型
*/
public class Edge<K, V> extends Tuple3<K, K, V> {
private static final long serialVersionUID = 1L;
public Edge(){}
public Edge(K source, K target, V value) {
this.f0 = source;
this.f1 = target;
this.f2 = value;
/**
* Reverses the direction of this Edge.
* @return a new Edge, where the source is the original Edge's target
* and the target is the original Edge's source.
*/
public Edge<K, V> reverse() {
return new Edge<>(this.f1, this.f0, this.f2);
public void setSource(K source) {
this.f0 = source;
public K getSource() {
return this.f0;
public void setTarget(K target) {
this.f1 = target;
public K getTarget() {
return f1;
public void setValue(V value) {
this.f2 = value;
public V getValue() {
return f2;
边是表示两个顶点之间的连线。那么怎样才能记录一条边呢。很容易想到,需要记录边的出发点(起始顶点)和目的点(目标顶点)。然后两个顶点之间的连线是有值的,这个值可以表示不同的含义,比如可以表示权重,或者表示距离。
所以,边的数据结构是继承了Tuple3<K, K, V>。其中,K表示顶点的类型,分别是起始顶点和目标顶点;V表示边的值类型。
2.3. 图的定义
我们已经知道图(Graph)是由边和顶点构成的,那么我们不难想象Graph这个类的定义。
package org.apache.flink.graph;
/**
* 定义由边{@link Edge edges}和顶点{@link Vertex vertices}组成的图。
* * @see org.apache.flink.graph.Edge
* @see org.apache.flink.graph.Vertex
* * * @param <K> 顶点的key类型
* @param <VV> 顶点的值类型
* @param <EV> 边的值类型
*/
@SuppressWarnings("serial")
public class Graph<K, VV, EV> {
// 1. 执行环境
private final ExecutionEnvironment context;
// 2.图的顶点数据集
private final DataSet<Vertex<K, VV>> vertices;
// 3. 图的边数据集
private final DataSet<Edge<K, EV>> edges;
// 省略了方法操作等
上述代码是图的定义中的成员变量部分。
ExecutionEnvironment :图的执行上下文,图的计算其实也是一个单独的任务。需要依靠于这个执行上下文去获取一些相关执行配置等。DataSet<Vertex<K, VV>> : 顶点的数据集。DataSet<Edge<K, EV>>: 边的数据集。
Graph类中定义了很多构造方法,这些构造方法主要是要适应从不同类型的数据源中读取图数据集。比如从csv文件中读取,或者说只提供了边数据集时,我们需要从顶点数据中提取出所有的顶点。这一块可能会随着业务的变化会有所拓展,因为宗旨就是方便不同的开发者能够多样化读入数据源。 下面列出一部分构建函数:
/**
* 从两个DataSets中创建图:顶点和边
* @param vertices 顶点的DataSet.
* @param edges 边的DataSet.
* @param context flink执行环境.
*/
protected Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
this.vertices = vertices;
this.edges = edges;
this.context = context;
/**
* 从顶点和边的集合数据中创建图
* @param vertices a Collection of vertices.
* @param edges a Collection of edges.
* @param context the flink execution environment.
* @return the newly created graph.
*/
public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Vertex<K, VV>> vertices,
Collection<Edge<K, EV>> edges, ExecutionEnvironment context) {
return fromDataSet(context.fromCollection(vertices),
context.fromCollection(edges), context);
/**
* 从边的集合数据创建图。图的顶点会自动根据边来创建。并且顶点的值被设为Null
* Creates a graph from a Collection of edges.
* Vertices are created automatically and their values are set to
* NullValue.
* @param edges a Collection of edges.
* @param context the flink execution environment.
* @return the newly created graph.
*/
public static <K, EV> Graph<K, NullValue, EV> fromCollection(Collection<Edge<K, EV>> edges,
ExecutionEnvironment context) {
return fromDataSet(context.fromCollection(edges), context);
/**
* 从边的数据集中构建出图
* @param edges a Collection of edges.
* @param vertexValueInitializer a map function that initializes the vertex values.
* It allows to apply a map transformation on the vertex ID to produce an initial vertex value.
* @param context the flink execution environment.
* @return the newly created graph.
*/
public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Edge<K, EV>> edges,
final MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {
return fromDataSet(context.fromCollection(edges), vertexValueInitializer, context);
/**
* 从顶点数据集和边的数据集中创建图。
* Creates a graph from a DataSet of vertices and a DataSet of edges.
* @param vertices a DataSet of vertices.
* @param edges a DataSet of edges.
* @param context the flink execution environment.
* @return the newly created graph.
*/
public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Vertex<K, VV>> vertices,
DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
return new Graph<>(vertices, edges, context);
/**
* 从边的DataSet中创建图。
* NullValue.
* @param edges a DataSet of edges.
* @param context the flink execution environment.
* @return the newly created graph.
*/
public static <K, EV> Graph<K, NullValue, EV> fromDataSet(
DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
DataSet<Vertex<K, NullValue>> vertices = edges
.flatMap(new EmitSrcAndTarget<>())
.name("Source and target IDs")
.distinct()
.name("IDs");
return new Graph<>(vertices, edges, context);
Graph类里还定义了一些和图相关的基本操作方法,比如:移除顶点操作,反转图的所有边方向等。因为整个代码太长,我们没有必要一个个去看,后面会结合应用案例去熟悉相关操作方法。
3.图的简单操作案例
package org.apache.flink.graph.examples;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeJoinFunction;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Triplet;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.examples.data.EuclideanGraphData;
import java.io.Serializable;
/**
* 这个例子主要是展示怎样应用Gelly的{@link Graph#getTriplets()}和
* {@link Graph#joinWithEdges(DataSet, EdgeJoinFunction)}方法
* 给定一个有方向的,无权重的图。顶点值表示平面中的点
* 返回一个加权图,其中边权重等于SRC和TRG顶点值之间的欧几里得距离。
* <p>输入文件是纯文本文件,必须按以下格式设置:
* <ul>
* <li> 顶点由顶点和顶点值表示,并用换行符分隔。由两个用逗号分隔的双精度数组成的值。,
* 例如: <code>1,1.0,1.0\n2,2.0,2.0\n3,3.0,3.0\n</code> 定义了由3个顶点组成的数据集
* <li> 边由一对srcvertexid,trgvertexid由逗号分隔。
* 他们是由换行符分隔的边。
* 例如: <code>1,2\n1,3\n</code> 定义了两条边 1-2 和 1-3.
* </ul>
* <p>Usage <code>EuclideanGraphWeighing <vertex path> <edge path> <result path></code><br>
* If no parameters are provided, the program is run with default data from
* {@link EuclideanGraphData}
*/
@SuppressWarnings("serial")
public class EuclideanGraphWeighing implements ProgramDescription {
public static void main(String[] args) throws Exception {
if (!parseParameters(args)) {
return;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 1. 读取顶点数据
DataSet<Vertex<Long, Point>> vertices = getVerticesDataSet(env);
// 2. 读取边数据
DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
// 3. 构建图
Graph<Long, Point, Double> graph = Graph.fromDataSet(vertices, edges, env);
// 4. 边的值将会是源顶点和目的顶点之间的欧式距离
DataSet<Tuple3<Long, Long, Double>> edgesWithEuclideanWeight = graph.getTriplets()
.map(new MapFunction<Triplet<Long, Point, Double>, Tuple3<Long, Long, Double>>() {
@Override
public Tuple3<Long, Long, Double> map(Triplet<Long, Point, Double> triplet)
throws Exception {
Vertex<Long, Point> srcVertex = triplet.getSrcVertex();
Vertex<Long, Point> trgVertex = triplet.getTrgVertex();
return new Tuple3<>(srcVertex.getId(), trgVertex.getId(),
srcVertex.getValue().euclideanDistance(trgVertex.getValue()));
});
// 5. 结果图
Graph<Long, Point, Double> resultedGraph = graph.joinWithEdges(edgesWithEuclideanWeight,
new EdgeJoinFunction<Double, Double>() {
public Double edgeJoin(Double edgeValue, Double inputValue) {
return inputValue;
});
// 从最后的结果图中还原出边
DataSet<Edge<Long, Double>> result = resultedGraph.getEdges();
// emit result
if (fileOutput) {
result.writeAsCsv(outputPath, "\n", ",");
// since file sinks are lazy, we trigger the execution explicitly
env.execute("Euclidean Graph Weighing Example");
} else {
result.print();
@Override
public String getDescription() {
return "Weighing a graph by computing the Euclidean distance " +
"between its vertices";
// *************************************************************************
// 数据类型
// *************************************************************************
/**
* 一个检点的二维点坐标数据结构
*/
public static class Point implements Serializable {
public double x, y;
public Point() {}
public Point(double x, double y) {
this.x = x;
this.y = y;
public double euclideanDistance(Point other) {
return Math.sqrt((x - other.x) * (x - other.x) + (y - other.y) * (y - other.y));
@Override
public String toString() {
return x + " " + y;
// ******************************************************************************************************************
// 工具方法
// ******************************************************************************************************************
private static boolean fileOutput = false;
private static String verticesInputPath = null;
private static String edgesInputPath = null;
private static String outputPath = null;
/**
* 解析命令行参数
* @param args
* @return
*/
private static boolean parseParameters(String[] args) {
if (args.length > 0) {
if (args.length == 3) {
fileOutput = true;
verticesInputPath = args[0];
edgesInputPath = args[1];
outputPath = args[2];
} else {
System.out.println("Executing Euclidean Graph Weighing example with default parameters and built-in default data.");
System.out.println("Provide parameters to read input data from files.");
System.out.println("See the documentation for the correct format of input files.");
System.err.println("Usage: EuclideanGraphWeighing <input vertices path> <input edges path>" +
" <output path>");
return false;
return true;
/**
* 加载顶点数据
* @param env
* @return
*/
private static DataSet<Vertex<Long, Point>> getVerticesDataSet(ExecutionEnvironment env) {
// 1. 如果穿了外部读取文件
if (fileOutput) {
// 读取csv文件数据,并且映射为Vertex数据。
return env.readCsvFile(verticesInputPath)
.lineDelimiter("\n")
.types(Long.class, Double.class, Double.class)
.map(new MapFunction<Tuple3<Long, Double, Double>, Vertex<Long, Point>>() {
@Override
public Vertex<Long, Point> map(Tuple3<Long, Double, Double> value) throws Exception {
return new Vertex<>(value.f0, new Point(value.f1, value.f2));
});
} else {
// 读取默认的数据
return EuclideanGraphData.getDefaultVertexDataSet(env);
private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
if (fileOutput) {
return env.readCsvFile(edgesInputPath)
.lineDelimiter("\n")
.types(Long.class, Long.class)
.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
@Override
public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception {
return new Edge<>(tuple2.f0, tuple2.f1, 0.0);
});
} else {
return EuclideanGraphData.getDefaultEdgeDataSet(env);
上述代码重点利用了图的joinWithEdges操作:
/**
* 边的DataSet和输入的DataSet,通过sourceIds和targetIds的复合键进行Join。
* 并且利用自定义的转移函数应用到匹配的记录值上去。
* 输入Dataset的前两个字段是用来作为join的key。
* @param inputDataSet 用来join的输入Dataset
* Tuple3的前两个字段是作为join操作的复合键。然后第三个字段会作为参数传进转移函数里面去。
* @param edgeJoinFunction 应用的转移函数。
* 第一个参数是当前的边值,第二个参数是从输入Dataset传进来的匹配Tuple3的值
* @param <T> 输入Tuple3数据集的第三个字段。
* @return 返回一个新图,然后边的值根据edgeJoinFunction跟新了
*/
public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>> inputDataSet,
final EdgeJoinFunction<EV, T> edgeJoinFunction) {
DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
.coGroup(inputDataSet).where(0, 1).equalTo(0, 1)
.with(new ApplyCoGroupToEdgeValues<>(edgeJoinFunction))
.name("Join with edges");
return new Graph<>(this.vertices, resultedEdges, this.context);
coGroup操作执行的逻辑定义在Graph类文件的子类中:ApplyCoGroupToEdgeValues。
private static final class ApplyCoGroupToEdgeValues<K, EV, T>
implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, T>, Edge<K, EV>> {
private Edge<K, EV> output = new Edge<>();
private EdgeJoinFunction<EV, T> edgeJoinFunction;
public ApplyCoGroupToEdgeValues(EdgeJoinFunction<EV, T> mapper) {
this.edgeJoinFunction = mapper;
@Override
public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Tuple3<K, K, T>> input,
Collector<Edge<K, EV>> collector) throws Exception {
final Iterator<Edge<K, EV>> edgesIterator = edges.iterator();
final Iterator<Tuple3<K, K, T>> inputIterator = input.iterator();
if (edgesIterator.hasNext()) {
if (inputIterator.hasNext()) {
final Tuple3<K, K, T> inputNext = inputIterator.next();
output.f0 = inputNext.f0;
output.f1 = inputNext.f1;
output.f2 = edgeJoinFunction.edgeJoin(edgesIterator.next().f2, inputNext.f2);
collector.collect(output);
} else {
collector.collect(edgesIterator.next());
最终程序运行的结果为:
(1,4,4.242640687119285)
(2,5,4.242640687119285)
(4,6,2.8284271247461903)
(7,9,2.8284271247461903)
(1,2,1.4142135623730951)
(2,3,1.4142135623730951)
(3,5,2.8284271247461903)
(5,7,2.8284271247461903)
(5,9,5.656854249492381)
(6,8,2.8284271247461903)
(7,8,1.4142135623730951)
(2,4,2.8284271247461903)
(4,5,1.4142135623730951)
(8,9,1.4142135623730951)
(6,7,1.4142135623730951)
其实图的操作很简单,只要把图的数据源定义清楚,后面的操作其实就像跟操作普通的Tuple结构数据是一样的。
备注:上述案例的源码请参考flink源码的flink-gelly-examples模块。
hxcaifly
关注
关注
点赞
收藏
打赏
评论
【Flink图计算源码解析】:Flink图的存储数据结构
承接上篇:https://blog.csdn.net/hxcaifly/article/details/861477361.图的基本组成图由边和顶点构成:Edge: 边。每条边是Tuple3&amp;amp;amp;amp;amp;amp;lt;K,K,V&amp;amp;amp;amp;amp;amp;gt;的数据结构,保存了边的开始顶点Id,边的目的顶点Id和边的值。Vertex: 顶点。每个顶点是Tuple2&amp;amp;amp;amp;amp;
复制链接
扫一扫
专栏目录
Flink从入门到精通
12-26
<p>本课程从0到1进行Flink的介绍,对当下最火的大数据技术Flink进行深入解析,并结合实际电商项目快速掌握Flink的开发技术。</p>
Flink系列之大数据分布式计算引擎设计实现剖析
最新发布
luoyepiaoxue2014的博客
12-01
83
Flink系列之大数据分布式计算引擎设计实现剖析
评论 3
您还未登录,请先
登录
后发表或查看评论
Flink之数据类型详解
lixinkuan的博客
06-21
4802
一、数据类型支持
Flink支持非常完善的数据类型,数据类型的描述信息都是由TypeInformation定义,比较常用的TypeInformation有BasicTypeInfo、TupleTypeInfo、CaseClassTypeInfo、PojoTypeInfo类等。TypeInformation主要作用是为了在Flink系统内有效地对数据结构类型进行管理,能够在分布式计算过程中对数据的类型进行管理和推断。同时基于对数据的类型信息管理,Flink内部对数据存储也进行了相应的性能优化。
Flink
Flink源码-3-作业数据结构
csdn问鼎
05-06
186
大图
示例代码
public class WorldCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
...
【Flink图计算源码解析】开篇:Flink图计算总览
hxcaifly的博客
01-09
5587
文章目录1. 图计算的作用2. 本专题的写作目的3. Flink Gelly引擎总览3.1. Gelly的源码结构1. Graph的存储数据结构2. 图的分类3. 图的验证以及指标4. 图的生成器5. Library6.图的迭代操作7. examples案例4. 后记
1. 图计算的作用
哲学上说事物之间普遍存在联系的,通常来说可以将事物看作图的顶点,事物间的联系看作图的边,典型的场景:
对应于...
Flink生态系列-图计算篇
枫叶的落寞的博客
05-07
905
敬请期待。。。
flink中各种图的原理(还没搞完)
微电子学与固体电子学
06-10
238
https://lulaoshi.info/flink/chapter-system-design/task-resource.html
Flink(1.11)概述——含架构图
平平无奇小码农~
11-29
690
文章目录一、Flink概述**1.1** **Flink and Spark****1.2** **Flink or Spark**二、wordCount案例2.1 批处理2.2 有界流2.3 无界流2.4 打成jar包上传到flink运行三、运行环境3.1 standalone模式3.1.1 前端界面3.1.2 命令行方式提交3.1.3 standalone模式HA配置:启动:3.2 Yarn模式3.2.1 配环境,集成hadoop3.2.2 直接运行3.2.3
图文解说Flink的应用场景和功能
延宝小白马的博客
08-15
3835
一 Flink是什么
Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见的集群环境中运行,并能以内存速度和任意规模进行计算。
二 为什么要选择Flink
流式处理的特点是无界和实时的,不需要针对整个数据集进行处理,而是对系统传输的每个数据执行操作,因此流数据处理能更真实的反应我们的生活和工作方式;通常用在实时处理方面。
我们工作的目标是低延迟的、高吞吐的、高可用的实时应用。
三 哪些行业需要处理流数据
电商
flink 架构图
an13654067079的博客
06-24
295
flink 架构图
Flink实时计算运用(一)概述与应用场景
热门推荐
Mirson - 多一些分享, 多一些进步
04-24
1万+
1. Flink概述
Flink是什么
Flink是一个面向数据流处理和批处理的分布式开源计算框架。
无界流VS有界流
任何类型的数据都可以形成流数据,比如用户交互记录, 传感器数据,事件日志等等。
Apache Flink 擅长处理无界和有界数据集。 精确的时间控制和有状态的计算,使得 Flink能够运行任何处理无界流的应用。
流数据分为无界流和有界流。
1) 无界流:有定义流的开始,但没有定义流的结束, 会不停地产生数据,无界流采用的是流处理方式。
2) 有界流:有定义流的开始, 也有
Flink01:快速了解Flink:什么是Flink、Flink架构图、Flink三大核心组件、Flink的流处理与批处理、Storm vs SparkStreaming vs Flink
weixin_40612128的博客
03-15
739
一、什么是Flink
Apache Flink 是一个开源的分布式,高性能,高可用,准确的流处理框架。
分布式:表示flink程序可以运行在很多台机器上,
高性能:表示Flink处理性能比较高
高可用:表示flink的稳定性和可用性是比较好的。
准确的:表示flink可以保证处理数据的准确性。
Flink支持流处理和批处理,虽然我们刚才说了flink是一个流处理框架,但是它也支持批处理。
其实对于flink而言,它是一个流处理框架,批处理只是流处理的一个极限特例而已。
看一下这张图
左边是数据源,从这里面
Flink 的运行架构详细剖析
yuan_more的博客
11-04
526
1. Flink 程序结构
Flink 程序的基本构建块是流和转换(请注意,Flink 的 DataSet API 中使用的 DataSet 也是内部流 )。从概念上讲,流是(可能永无止境的)数据记录流,而转换是将一个或多个流作为一个或多个流的操作。输入,并产生一个或多个输出流。
Flink 应用程序结构就是如上图所示:
Source: 数据源,Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自
Flink入门第十八课:DataStream的数据结构以及Flink的连接操作
曹利荣的博客
01-06
1098
1、DataStream
代表一个运行在多个分区上的并行流,转换操作都是逐条的。
可以从 StreamExecutionEnvironment 或者env.addSource(SourceFunction) 获得。
常用操作:map(),flatMap(),filter()、rebalance()、 broadcaseted()
2、KeyedStream
用来表示根据指定的key进行分组的数据流,执行任何转换操作都将返回DataStream。
可以通过调用D...
Flink简介、基本原理、架构图
白色咖啡
08-20
664
Flink是一款支持有状态运算的流计算引擎。支持有状态运算是指数据的计算过程中可以保存计算的中间过程状态,比如我们要计算一个整数数据流的求和,那么我们就需要一个中间变量把数据流中的每一项数据加到这个变量上。而这个变量就是计算的中间状态。Flink框架会帮你管理状态的保存和复原。流计算是指我们要针对一个数据流进行源源不断的计算,产出实时的结果。比如说我们曾经看到过天猫淘宝的双11大屏,上面有当前的GMV成交量的数据,这个数字在屏幕上疯狂的跳动。这就是通过流计算产出的实时GMV成交量数据。
Flink的计算方式
初心江湖路的博客
05-09
1489
首先看一张来自官网的Flink运行时架构图
看图说话,先总结下Flink计算引擎的几个抽象:
管理抽象:
JobManager (Master) 负责调度任务执行、负责指挥进行检查点、负责任务失败容错恢复等。
TaskManager (Worker) 负责具体任务的执行、缓冲和交换数据流等。
每个管理者都对应着独立的JVM进程。
执行抽象:
Task,本质上都回归到线程执行具体的task。
我们先不论资源管理方式。可以看到,资源被划分为细粒度的Slot。Flink Job也跟Spark Job一样,均以T
常用图算法实现--Flink
crazy_scott的博客
01-03
1576
使用Flink实现PageRank、强连通分量、单源最短路径、二分图匹配…
PageRank
主要参考官网的example
算法流程
每次计算当前每个网页的转移概率,计算下一时刻到达每个网页的概率并加入随机跳转
数据准备
pages.txt
准备一些顶点,例如1-15
links.txt
准备一些连接边(也就是链接数):
1 2
1 15
2 3
2 4
2 5
2 6
2 7
3 13
4 2...
Flink最全知识图谱,思维导图,看完才算搞懂flink
songjifei的专栏
03-09
2409
该图谱由 Apache Flink Committer 执笔,四位 PMC 成员审核,将 Flink 9 大技术版块详细拆分,突出重点内容并搭配全面的学习素材。看完这份图谱,才算真的搞懂 Flink!
如何获取?
关注「大数据极客」微信公众号,后台回复关键字“图谱”即可下载 PDF 版本,内含大量补充链接,一键点击即可查看相关素材!
...
Flink1.12源码解读——ExecutionGraph执行图构建过程
ws0owws0ow的博客
03-01
598
Apache Flink作为国内最火的大数据计算引擎之一,自身支持高吞吐,低延迟,exactly-once语义,有状态流等特性,阅读源码有助加深对框架的理解和认知。
Flink分为四种执行图,本章主要解析Execution的生成计划
本章及后续源码解读环境以生产为主:运行模式:OnYarn,HA模式:ZK,Mode:Streaming,关键逻辑解释我会备注到代码上(灰色字体)勿忽视。
此章节涉及的组件和框架的代码依赖部分大数据生态包括Yarn,Akka,ZK等。建议有此基础上阅读会提高可阅读性和实现理
Flink 原理与实现:理解 Flink 中的计算资源
weixin_33754065的博客
11-24
602
本文所讨论的计算资源是指用来执行 Task 的资源,是一个逻辑概念。本文会介绍 Flink 计算资源相关的一些核心概念,如:Slot、SlotSharingGroup、CoLocationGroup、Chain等。并会着重讨论 Flink 如何对计算资源进行管理和隔离,如何将计算资源利用率最大化等等。理解 Flink 中的计算资源对于理解 Job 如何在...
深入理解Apache Flink核心技术
CSDN 人工智能
02-18
4010
作者:李呈祥
作者简介:Intel BigData Team软件工程师,主要关注大数据计算框架与SQL引擎的性能优化,Apache Hive Committer,Apache Flink Contributor。
责任编辑:仲浩(zhonghao@csdn.net)
文章来源:《程序员》2月期
版权声明:本文为《程序员》原创文章,未经允许不得转载,订阅2016年《程...
“相关推荐”对你有帮助么?
非常没帮助
没帮助
一般
有帮助
非常有帮助
提交
©️2022 CSDN
皮肤主题:技术黑板
设计师:CSDN官方博客
返回首页
hxcaifly
CSDN认证博客专家
CSDN认证企业博客
码龄5年
暂无认证
111
原创
3万+
周排名
37万+
总排名
32万+
访问
等级
4230
积分
391
粉丝
290
获赞
152
评论
840
收藏
私信
关注
热门文章
【有监督分箱】方法一:卡方分箱
41693
【NLP技术】:NLP简单介绍
18284
【有监督分箱】方法二: Best-KS分箱
12652
如何解决数据不平衡问题
10247
Flink提交任务(总篇)——执行逻辑整体分析
9767
分类专栏
招聘
1篇
生活杂记
3篇
任务调度系统
4篇
JVM
15篇
Java设计模式
23篇
Flink原理和应用
29篇
Java多线程编程
9篇
Java网络编程
5篇
后台开发
8篇
Python
4篇
数据挖掘和机器学习
15篇
NLP
3篇
java
70篇
Flink
31篇
Hadoop
4篇
Spark
10篇
数据库
7篇
JVM
6篇
大数据平台
6篇
计算机基础知识
6篇
java网络编程
5篇
Scala
1篇
编译原理
生活
3篇
golang
1篇
Spring
5篇
spring boot
1篇
ElasticSearch
1篇
微服务
3篇
leetcode
1篇
存储
1篇
Web开发
1篇
最新评论
【任务调度系统第四篇】:Quartz的原理
稳定的穷:
解读的太好了,感觉必须学完spring源码看这个终于不废劲了
【Flink图计算源码解析】开篇:Flink图计算总览
canaryW:
如果对gelly感兴趣可以先从pregel论文看起,整明白了分布式图计算的逻辑然后到github看看能不能contribute
【Flink图计算源码解析】开篇:Flink图计算总览
JZL要努力啊:
你好,作为研究生,对flink感兴趣,希望能对其做些研究,请问前辈有什么建议吗?
【任务调度系统第二篇】:XXL Job源码分析
excusme:
说的springboot怎么例子用的是还是spring的xml
[Flink原理介绍第四篇】:Flink的Checkpoint和Savepoint介绍
晴初1997:
牛的
您愿意向朋友推荐“博客详情页”吗?
强烈不推荐
不推荐
一般般
推荐
强烈推荐
提交
最新文章
蚂蚁金服消费金融风险管理技术团队期待你的加入
【Web开发】:HTTP响应状态码总结
【存储】:《Column-Stores vs. Row-Stores》读后感
2020年1篇
2019年94篇
2018年84篇
目录
目录
分类专栏
招聘
1篇
生活杂记
3篇
任务调度系统
4篇
JVM
15篇
Java设计模式
23篇
Flink原理和应用
29篇
Java多线程编程
9篇
Java网络编程
5篇
后台开发
8篇
Python
4篇
数据挖掘和机器学习
15篇
NLP
3篇
java
70篇
Flink
31篇
Hadoop
4篇
Spark
10篇
数据库
7篇
JVM
6篇
大数据平台
6篇
计算机基础知识
6篇
java网络编程
5篇
Scala
1篇
编译原理
生活
3篇
golang
1篇
Spring
5篇
spring boot
1篇
ElasticSearch
1篇
微服务
3篇
leetcode
1篇
存储
1篇
Web开发
1篇
目录
评论 3
被折叠的 条评论
为什么被折叠?
到【灌水乐园】发言
查看更多评论
打赏作者
hxcaifly
你的鼓励将是我创作的最大动力
¥2
¥4
¥6
¥10
¥20
输入1-500的整数
余额支付
(余额:-- )
扫码支付
扫码支付:¥2
获取中
扫码支付
您的余额不足,请更换扫码支付或充值
打赏作者
实付元
使用余额支付
点击重新获取
扫码支付
钱包余额
抵扣说明:
1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。 2.余额无法直接购买下载,可以购买VIP、C币套餐、付费专栏及课程。
余额充值