books/flink-in-action-6.5.md
在 1.9 版本中还剩最后一个扩展库就是 Gelly,本节将带你了解一下 Gelly 的功能以及如何使用。
<!--more-->Gelly 是 Flink 的图 API 库,它包含了一组旨在简化 Flink 中图形分析应用程序开发的方法和实用程序。在 Gelly 中,可以使用类似于批处理 API 提供的高级函数来转换和修改图。Gelly 提供了创建、转换和修改图的方法以及图算法库。
因为 Gelly 是 Flink 项目中库的一部分,它本身不在 Flink 的二进制包中,所以运行 Gelly 项目(Java 应用程序)是需要将 opt/flink-gelly_2.11-1.9.0.jar 移动到 lib 目录中,如果是 Scala 应用程序则需要将 opt/flink-gelly-scala_2.11-1.9.0.jar 移动到 lib 中,接着运行下面的命令就可以运行一个 flink-gelly-examples 项目。
./bin/flink run examples/gelly/flink-gelly-examples_2.11-1.9.0.jar \
--algorithm GraphMetrics --order directed \
--input RMatGraph --type integer --scale 20 --simplify directed \
--output print
接下来可以在 UI 上看到运行的结果如下图所示:
如果是自己创建的 Gelly Java 应用程序,则需要添加如下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-gelly_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
如果是 Gelly Scala 应用程序,添加下面的依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-gelly-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
引入好依赖后,接着将介绍一下 Gelly 该如何使用。
在 Gelly 中,一个图(Graph)由顶点的数据集(DataSet)和边的数据集(DataSet)组成。图中的顶点由 Vertex 类型来表示,一个 Vertex 由唯一的 ID 和一个值来表示。其中 Vertex 的 ID 必须是全局唯一的值,且实现了 Comparable 接口。如果节点不需要由任何值,则该值类型可以声明成 NullValue 类型。
//创建一个 Vertex<Long,String>
Vertex<Long, String> v = new Vertex<Long, String>(1L, "foo");
//创建一个 Vertex<Long,NullValue>
Vertex<Long, NullValue> v = new Vertex<Long, NullValue>(1L, NullValue.getInstance());
Graph 中的边由 Edge 类型来表示,一个 Edge 通常由源顶点的 ID,目标顶点的 ID 以及一个可选的值来表示。其中源顶点和目标顶点的类型必须与 Vertex 的 ID 类型相同。同样的,如果边不需要由任何值,则该值类型可以声明成 NullValue 类型。
Edge<Long, Double> e = new Edge<Long, Double>(1L, 2L, 0.5);
//反转此 edge 的源和目标
Edge<Long, Double> reversed = e.reverse();
Double weight = e.getValue(); // weight = 0.5
在 Gelly 中,一个 Edge 总是从源顶点指向目标顶点。如果图中每条边都能匹配一个从目标顶点到源顶点的 Edge,那么这个图可能是个无向图。同样地,无向图可以用这个方式来表示。
可以通过以下几种方式创建一个 Graph:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Vertex<String, Long>> vertices = ...
DataSet<Edge<String, Double>> edges = ...
Graph<String, Long, Double> graph = Graph.fromDataSet(vertices, edges, env);
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<String, String>> edges = ...
Graph<String, NullValue, NullValue> graph = Graph.fromTuple2DataSet(edges, env);
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<String, Long>> vertexTuples = env.readCsvFile("path/to/vertex/input").types(String.class, Long.class);
DataSet<Tuple3<String, String, Double>> edgeTuples = env.readCsvFile("path/to/edge/input").types(String.class, String.class, Double.class);
Graph<String, Long, Double> graph = Graph.fromTupleDataSet(vertexTuples, edgeTuples, env);
//创建一个具有字符串 Vertex id、Long Vertex 和双边缘的图
Graph<String, Long, Double> graph = Graph.fromCsvReader("path/to/vertex/input", "path/to/edge/input", env)
.types(String.class, Long.class, Double.class);
//创建一个既没有顶点值也没有边值的图
Graph<Long, NullValue, NullValue> simpleGraph = Graph.fromCsvReader("path/to/edge/input", env).keyType(Long.class);
List<Vertex<Long, Long>> vertexList = new ArrayList...
List<Edge<Long, String>> edgeList = new ArrayList...
Graph<Long, Long, String> graph = Graph.fromCollection(vertexList, edgeList, env);
//将顶点值初始化为顶点ID
Graph<Long, Long, String> graph = Graph.fromCollection(edgeList,
new MapFunction<Long, Long>() {
public Long map(Long value) {
return value;
}
}, env);
Gelly 提供了下列方法来查询图的属性和指标:
DataSet<Vertex<K, VV>> getVertices()
//获取边缘数据集
DataSet<Edge<K, EV>> getEdges()
//获取顶点的 id 数据集
DataSet<K> getVertexIds()
DataSet<Tuple2<K, K>> getEdgeIds()
DataSet<Tuple2<K, LongValue>> inDegrees()
DataSet<Tuple2<K, LongValue>> outDegrees()
DataSet<Tuple2<K, LongValue>> getDegrees()
long numberOfVertices()
long numberOfEdges()
DataSet<Triplet<K, VV, EV>> getTriplets()
Graph 转换方式有下面几种方式:
Gelly 内置下列方法以支持对一个图进行节点和边的增加/移除操作:
Graph<K, VV, EV> addVertex(final Vertex<K, VV> vertex)
Graph<K, VV, EV> addVertices(List<Vertex<K, VV>> verticesToAdd)
Graph<K, VV, EV> addEdge(Vertex<K, VV> source, Vertex<K, VV> target, EV edgeValue)
Graph<K, VV, EV> addEdges(List<Edge<K, EV>> newEdges)
Graph<K, VV, EV> removeVertex(Vertex<K, VV> vertex)
Graph<K, VV, EV> removeVertices(List<Vertex<K, VV>> verticesToBeRemoved)
Graph<K, VV, EV> removeEdge(Edge<K, EV> edge)
Graph<K, VV, EV> removeEdges(List<Edge<K, EV>> edgesToBeRemoved)
加入知识星球可以看到上面文章:https://t.zsxq.com/nMR7ufq
本章所讲的内容属于 Flink 的扩展库,包含了 CEP 复杂事件处理、State Processor API、Machine Learning 和 Gelly,各种都有讲解一些样例,但是没有过多深入的讲,但还是希望你可以在书本外自己去扩充这些内容的知识点。