flink自动加全局流水

news/2024/5/5 16:11:21

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction

import re
import redis


# 创建 StreamExecutionEnvironment 对象
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

# 读取文件,创建 DataStream 对象
data_stream = env.read_text_file('/root/pyflink/elink_20230603')
# 对每行数据添加字符串 'aaaa'
class MyMapFunction(MapFunction):
   def open(self, runtime_context: RuntimeContext):
     self.r = redis.Redis(host='127.0.0.1', port=6379)

   def map(self,line):
    process_id='';
    bus_seq=''
    if not line.startswith("ES"):
        return
    if '<Serial>' in line:
       pat=re.compile(r"<Serial>(\d+)</Serial>")
       bus_seq=pat.findall(line)
       process_id=line.split()[1]
       self.r.set(process_id,bus_seq[0])
    process_id=line.split()[1]
    if not len(process_id)==6 :
        process_id=line.split()[2]
     
    bus_seq=self.r.get(process_id) 
    if not bus_seq:
        return
    #self.r.delete(process_id)
    return(bus_seq.decode('UTF-8')+'->'+line)
new_stream = data_stream.map(MyMapFunction()).set_parallelism(1)

# 输出到控制台
new_stream.print()

# 执行任务
env.execute('Add "aaaa" to each line')


http://www.niftyadmin.cn/n/393874.html

相关文章

BUG生命周期和管理

1、BUG的影响 精神的摧残 ● 谁会愿意得到垃圾团队的称号&#xff1f; ● BUG有着无穷的生命力&#xff0c;你会很悲观&#xff0c;认为自己已经无能为力了&#xff0c;这种情绪会在长时间的工作后加重。 ● 大家都厌倦重复处理相同的问题&#xff0c;测试人员也已经烦透了长长…

网格搜索:Python遍历网格中每个点

遍历网格中每个点 1. 问题描述2. Python实现2.1 网格参数初始化2.2 遍历赋值2.3 矩阵赋值1. 问题描述 最近需要实现一个对矩阵赋值并对矩阵表示的网格参数进行测试的任务,假设网格的长宽均为k,待搜索的两个参数是Pitch 和 Yaw,存在两个列表中。现在需要将网格上不同参数设置…

Jenkins概念及安装配置教程(四)

什么是Jenkins管道&#xff1f; Jenkins 中的管道是一组按特定顺序相互关联的作业&#xff08;或事件&#xff09;。Jenkins Pipeline 是一组或一套插件&#xff0c;为将持续交付管道实施和集成到 Jenkins 中提供支持。 Pipeline 还提供了一组工具&#xff0c;可用于通过“Pi…

Rust每日一练(Leetday0020) 最后单词的长度、螺旋矩阵II、排列序列

目录 58. 最后一个单词的长度 Length of Last Word &#x1f31f; 59. 螺旋矩阵 II Spiral Matrix II &#x1f31f;&#x1f31f; 60. 排列序列 Permutation Sequence &#x1f31f;&#x1f31f;&#x1f31f; &#x1f31f; 每日一练刷题专栏 &#x1f31f; Rust每日…

计算机网络第一章——计算机系统结构(下)

提示&#xff1a;总角之宴&#xff0c;言笑晏晏。信誓旦旦&#xff0c;不思其反。反是不思&#xff0c;亦已焉哉。 文章目录 1.2.1 分层结构&#xff0c;协议&#xff0c;接口和服务为什么要有分层&#xff1f;怎么分层正式认识分层结构概念总结 1.2.2 OSI 参考模型ISO参考模型…

Java企业级信息系统开发学习笔记14 Spring Boot(使用Spring Initializr方式构建Spring Boot项目)

文章目录 一、使用Spring Initializr方式构建Spring Boot项目&#xff08;一&#xff09;创建Spring Boot项目&#xff08;二&#xff09;创建控制器&#xff08;三&#xff09;运行入口类&#xff08;四&#xff09;访问Web页面&#xff08;五&#xff09;修改访问映射路径 一…

python基本操作3(速通版)

目录 一、字典 1.字典定义 2.字典的访问 3.字典的遍历 4.字典删除 5.字典练习 6.有序字典 7.集合 8.类型转化问题 9.公共方法 二、列表推导式 1.基本操作 2.在循环中使用if 三、组包和拆包 1.组包拆包基本应用 2.拆包的字典问题 四、python函数的一些特性 1.函…

文件与文件系统的打包、压缩、备份

Linux常见的压缩指令 常见压缩文件拓展名 *.Zcompress 程序压缩的文件*.zipzip 程序压缩的文件*.gzgzip 程序压缩的文件*.bz2bzip2 程序压缩的文件*.xzxz 程序压缩的文件*.tartar 程序打包的数据&#xff0c;未经压缩*.tar.gztar 程序打包的数据&#xff0c;并经过gzip的压缩…