[case52]聊聊flink KeyedStream的aggregation操作

news/2023/12/1 9:38:58

本文主要研究一下flink KeyedStream的aggregation操作

实例

    @Test
    public void testMax() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        WordCount[] data = new WordCount[]{new WordCount(1,"Hello", 1), new
                WordCount(1,"World", 3), new WordCount(2,"Hello", 1)};
        env.fromElements(data)
                .keyBy("word")
                .max("frequency")
                .addSink(new SinkFunction<WordCount>() {
                    @Override
                    public void invoke(WordCount value, Context context) throws Exception {
                        LOGGER.info("value:{}",value);
                    }
                });
        env.execute("testMax");
    }
  • 这里先对word字段进行keyBy操作,然后再通过KeyedStream的max方法按frequency字段取最大的WordCount

KeyedStream.aggregate

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java

    public SingleOutputStreamOperator<T> sum(int positionToSum) {
        return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> sum(String field) {
        return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> max(int positionToMax) {
        return aggregate(new ComparableAggregator<>(positionToMax, getType(), AggregationFunction.AggregationType.MAX,
                getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> max(String field) {
        return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAX,
                false, getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> min(int positionToMin) {
        return aggregate(new ComparableAggregator<>(positionToMin, getType(), AggregationFunction.AggregationType.MIN,
                getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> min(String field) {
        return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MIN,
                false, getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy) {
        return this.maxBy(positionToMaxBy, true);
    }

    public SingleOutputStreamOperator<T> maxBy(String positionToMaxBy) {
        return this.maxBy(positionToMaxBy, true);
    }

    public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy, boolean first) {
        return aggregate(new ComparableAggregator<>(positionToMaxBy, getType(), AggregationFunction.AggregationType.MAXBY, first,
                getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> maxBy(String field, boolean first) {
        return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAXBY,
                first, getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> minBy(int positionToMinBy) {
        return this.minBy(positionToMinBy, true);
    }

    public SingleOutputStreamOperator<T> minBy(String positionToMinBy) {
        return this.minBy(positionToMinBy, true);
    }

    public SingleOutputStreamOperator<T> minBy(int positionToMinBy, boolean first) {
        return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationFunction.AggregationType.MINBY, first,
                getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> minBy(String field, boolean first) {
        return aggregate(new ComparableAggregator(field, getType(), AggregationFunction.AggregationType.MINBY,
                first, getExecutionConfig()));
    }

    protected SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregate) {
        StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(
                clean(aggregate), getType().createSerializer(getExecutionConfig()));
        return transform("Keyed Aggregation", getType(), operator);
    }
  • KeyedStream的aggregation方法是protected修饰的,sum、max、min、maxBy、minBy这几个方法实际都是调用aggregate方法,只是它们创建的ComparableAggregator的AggregationType不一样,分别是SUM, MAX, MIN, MAXBY, MINBY
  • 每个sum、max、min、maxBy、minBy都有两个重载方法,一个是int类型的参数,一个是String类型的参数
  • maxBy、minBy比sum、max、min多了first(boolean)参数,该参数用于指定在碰到多个compare值相等时,是否取第一个返回

ComparableAggregator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java

@Internal
public class ComparableAggregator<T> extends AggregationFunction<T> {

    private static final long serialVersionUID = 1L;

    private Comparator comparator;
    private boolean byAggregate;
    private boolean first;
    private final FieldAccessor<T, Object> fieldAccessor;

    private ComparableAggregator(AggregationType aggregationType, FieldAccessor<T, Object> fieldAccessor, boolean first) {
        this.comparator = Comparator.getForAggregation(aggregationType);
        this.byAggregate = (aggregationType == AggregationType.MAXBY) || (aggregationType == AggregationType.MINBY);
        this.first = first;
        this.fieldAccessor = fieldAccessor;
    }

    public ComparableAggregator(int positionToAggregate,
            TypeInformation<T> typeInfo,
            AggregationType aggregationType,
            ExecutionConfig config) {
        this(positionToAggregate, typeInfo, aggregationType, false, config);
    }

    public ComparableAggregator(int positionToAggregate,
            TypeInformation<T> typeInfo,
            AggregationType aggregationType,
            boolean first,
            ExecutionConfig config) {
        this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, positionToAggregate, config), first);
    }

    public ComparableAggregator(String field,
            TypeInformation<T> typeInfo,
            AggregationType aggregationType,
            boolean first,
            ExecutionConfig config) {
        this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, field, config), first);
    }

    @SuppressWarnings("unchecked")
    @Override
    public T reduce(T value1, T value2) throws Exception {
        Comparable<Object> o1 = (Comparable<Object>) fieldAccessor.get(value1);
        Object o2 = fieldAccessor.get(value2);

        int c = comparator.isExtremal(o1, o2);

        if (byAggregate) {
            // if they are the same we choose based on whether we want to first or last
            // element with the min/max.
            if (c == 0) {
                return first ? value1 : value2;
            }

            return c == 1 ? value1 : value2;

        } else {
            if (c == 0) {
                value1 = fieldAccessor.set(value1, o2);
            }
            return value1;
        }
    }
}
  • ComparableAggregator继承了AggregationFunction,而AggregationFunction则实现了ReduceFunction接口,这里ComparableAggregator实现的reduce方法,它首先借助Comparator来比较两个对象,然后根据是否是byAggregate做不同处理,如果是byAggregate,则在比较值为0时,判断是否返回最先遇到的元素,如果是则返回value1,否则返回value2,比较值非0时,则取比较值最大的元素返回;如果不是byAggregate,则如果比较值为0(比较字段的值value1小于等于value2的情况),则使用反射方法将value2的比较字段的值更新到value1,最后都是返回value1

AggregationFunction

@Internal
public abstract class AggregationFunction<T> implements ReduceFunction<T> {
    private static final long serialVersionUID = 1L;

    /**
     * Aggregation types that can be used on a windowed stream or keyed stream.
     */
    public enum AggregationType {
        SUM, MIN, MAX, MINBY, MAXBY,
    }
}
  • AggregationFunction声明实现了ReduceFunction,同时定义了五种类型的AggregationType,分别是SUM, MIN, MAX, MINBY, MAXBY

Comparator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/aggregation/Comparator.java

@Internal
public abstract class Comparator implements Serializable {

    private static final long serialVersionUID = 1L;

    public abstract <R> int isExtremal(Comparable<R> o1, R o2);

    public static Comparator getForAggregation(AggregationType type) {
        switch (type) {
        case MAX:
            return new MaxComparator();
        case MIN:
            return new MinComparator();
        case MINBY:
            return new MinByComparator();
        case MAXBY:
            return new MaxByComparator();
        default:
            throw new IllegalArgumentException("Unsupported aggregation type.");
        }
    }

    private static class MaxComparator extends Comparator {

        private static final long serialVersionUID = 1L;

        @Override
        public <R> int isExtremal(Comparable<R> o1, R o2) {
            return o1.compareTo(o2) > 0 ? 1 : 0;
        }

    }

    private static class MaxByComparator extends Comparator {

        private static final long serialVersionUID = 1L;

        @Override
        public <R> int isExtremal(Comparable<R> o1, R o2) {
            int c = o1.compareTo(o2);
            if (c > 0) {
                return 1;
            }
            if (c == 0) {
                return 0;
            } else {
                return -1;
            }
        }

    }

    private static class MinByComparator extends Comparator {

        private static final long serialVersionUID = 1L;

        @Override
        public <R> int isExtremal(Comparable<R> o1, R o2) {
            int c = o1.compareTo(o2);
            if (c < 0) {
                return 1;
            }
            if (c == 0) {
                return 0;
            } else {
                return -1;
            }
        }

    }

    private static class MinComparator extends Comparator {

        private static final long serialVersionUID = 1L;

        @Override
        public <R> int isExtremal(Comparable<R> o1, R o2) {
            return o1.compareTo(o2) < 0 ? 1 : 0;
        }

    }
}
  • Comparator则实现Serializable接口,定义了isExtremal抽象方法,同时提供了getForAggregation工厂方法,根据不同的AggregationType创建不同的Comparator
  • Comparator里头定义了MaxComparator、MinComparator、MinByComparator、MaxByComparator四个子类,它们都实现了isExtremal方法
  • MaxComparator直接利用Comparable接口定义的compareTo方法,不过它的返回只有0和1,compareTo大于0的时候才返回1,否则返回0,也就是大于的情况才返回1,否则返回0;MaxByComparator也先根据Comparable接口定义的compareTo方法获取值,不过它的返回值有3种,大于0的时候返回1,等于0时返回0,小于0时返回-1,也就是大于的情况返回1,相等的情况返回0,小于的情况返回-1

小结

  • KeyedStream的aggregation操作主要分为sum、max、min、maxBy、minBy这几个方法,它们内部都调用了protected修饰的aggregation方法,只是它们创建的ComparableAggregator的AggregationType不一样,分别是SUM, MAX, MIN, MAXBY, MINBY
  • ComparableAggregator继承了AggregationFunction,而AggregationFunction则实现了ReduceFunction接口,这里ComparableAggregator实现的reduce方法,它首先借助Comparator来比较两个对象,然后根据是否是byAggregate做不同处理,如果是byAggregate,则在比较值为0时,判断是否返回最先遇到的元素,如果是则返回最先遇到的,否则返回最后遇到的,比较值非0时,则取比较值最大的元素返回;如果不是byAggregate,则如果比较值为0,则使用反射方法将后者的值更新到value1,最后都是返回value1
  • Comparator里头定义了MaxComparator、MinComparator、MinByComparator、MaxByComparator四个子类,它们都实现了isExtremal方法;MaxComparator与MaxByComparator的区别在于,MaxComparator大于返回1,小于等于返回0,而MaxByComparator返回值更精细,大于返回1,等于返回0,小于返回-1;这个区别也体现在ComparableAggregator的reduce方法中,而且maxBy、minBy比其他方法多了一个first(boolean)参数,专门用于在比较值为的0的时候选择返回哪个元素;而reduce方法对于非byAggregate操作,始终返回的是value1,在比较值小于等于的时候,使用反射更新value1,然后返回value1

doc

  • DataStream Transformations

http://www.niftyadmin.cn/n/4821471.html

相关文章

【译】Swift算法俱乐部-快速排序

本文是对 Swift Algorithm Club 翻译的一篇文章。 Swift Algorithm Club是 raywenderlich.com网站出品的用Swift实现算法和数据结构的开源项目&#xff0c;目前在GitHub上有18000⭐️&#xff0c;我初略统计了一下&#xff0c;大概有一百左右个的算法和数据结构&#xff0c;基本…

如何解决安卓手机显示google play服务停止运行?

相信不少的安卓用户都遇到过这种情况&#xff1a;“很抱歉&#xff0c;‘google play服务’已停止运行”。这到底是怎么一回事呢&#xff1f;接下来就通过本文来给大家介绍一下&#xff0c;我们一起往下看&#xff01; 其实呢&#xff0c;这句话的意思就是说“您的设备不支持部…

第一次做代理,我建议你选择小程序代理!

40岁的老李&#xff0c;做了十年的家电批发&#xff0c;今年转型做了小程序代理&#xff0c;跟当地的一个购物商场合作&#xff0c;一个月赚了20多万&#xff0c;同时做了一个“吃喝玩乐”小程序平台&#xff0c;一个月能够带来5万以上的收益。 在这个世界上&#xff0c;赚钱有…

getDate方法的妙用(js判断闰年)

对于js中的Date对象&#xff0c;我们new Date()后做的最多的操作就是getTime()、getFullYear()、getMonth()、getSecond()&#xff0c;在实际开发中几乎很少会用到getDate()这个方法&#xff0c;因为应用场景太少了。在工作中我们经常会需要判断某个年份是否是闰年这个需求&…

阿里云服务器的一点小坑---端口不通问题,还是防火墙的锅

https://www.cnblogs.com/grey-wolf/p/8961581.html 一、问题概述 最近在组长支持下&#xff0c;一直在折腾jenkins&#xff0c;也推广到了两三个组。期间也加了jenkins相关的qq群&#xff0c;群里的一个哥们问题很奇怪&#xff1a; centos 7.4 64位&#xff0c;使用了如下链接…

存储过程用到的表、分组、排序、联结

查询存储过程用到的表&#xff0c;并进行分组、排序、联结&#xff1a; 1 SELECT 2 REFERENCED_OWNER,3 REFERENCED_NAME,4 LISTAGG(XH||>||NAME,,) WITHIN GROUP(ORDER BY XH ) NAME 5 FROM 6 ( 7 SELECT 8 A.REFERENCED_OWNER…

springboot和redis处理页面缓存

页面缓存是应对高并发的一个比较常见的方案&#xff0c;当请求页面的时候&#xff0c;会先查询redis缓存中是否存在&#xff0c;若存在则直接从缓存中返回页面&#xff0c;否则会通过代码逻辑去渲染页面&#xff0c;并将渲染后的页面缓存到redis中&#xff0c;然后返回。下面通…

Ribbon 框架简介及搭建

Ribbon简介1. 负载均衡框架&#xff0c;支持可插拔式的负载均衡规则2. 支持多种协议&#xff0c;如HTTP、UDP等3. 提供负载均衡客户端Ribbon子模块1. ribbon-core&#xff08;ribbon的核心&#xff0c;主要包含负载均衡器、负载均衡接口、客户端接口、内置负载均衡实现API&…

硬盘分区表详解

硬盘主引导扇区 硬盘主引导记录&#xff08;MBR&#xff09; 硬盘分区表&#xff08;DPT&#xff09; -------------------------------------------------------------- 物理位置&#xff1a;0面0道1扇区&#xff08;clindyer 0, side 0, sector 1) 大小&#xff1a; 512字节…

(原創) 數學就是loose coupling的極致表現 (OO)

Abstract我們已經在STL中看到loose coupling的威力了&#xff0c;但我今天發現&#xff0c;數學更是loose coupling最佳的詮釋。Introduction從小學到大學&#xff0c;哪些科目用處最大?英文和數學&#xff0c;英文不難理解&#xff0c;因為他是語言&#xff0c;任何知識都以英…

Visual Studio 2010 and the .NET Framework 4.0!

下一站:Visual Studio 2010 和 .NET Framework 4.0 REDMOND, Wash. — Sept. 29, 2008 — Microsoft Corp. today provided the first look at the next version of its developer tools and platform, which will be named Visual Studio 2010 and the .NET Framework 4.0. M…

CentOS基本的命令与快捷建

由于我的计算机在安装linux系统时&#xff0c;计算机出现了问题&#xff0c;并没有安装ubuntu而是安装的centos。虽然两者属于linux的不同版本&#xff0c;但是在具体的操作上大同小异。在学习linux的各种指令和快捷键的时候&#xff0c;难免会遇到各种各样的问题&#xff0c;以…

Delphi使用zlib来压缩文件

使用时&#xff0c;需要Zlib.pas和 Zlibconst.pas两个单元文件&#xff0c;这两个文件保存在 Delphi 5.0安装光盘上 InfoExtrasZlib目录下&#xff0c;此外&#xff0c;在 InfoExtrasZlibObj目录中还保存了 Zlib.pas单元引用的 Obj文件&#xff0c;把这个目录拷贝到delphi的lib…

Linq 动态查询库

【原文地址】Dynamic LINQ (Part 1: Using the LINQ Dynamic Query Library) 【原文发表日期】 Monday, January 07, 2008 11:02 PM LINQ &#xff08;语言级集成查询&#xff09;是VS 2008 和 .NET 3.5中提供的一个新特性。LINQ使得数据查询的概念成为.NET中的一等编程概念&a…

CentOS下screen 命令详解

一、背景 系统管理员经常需要SSH 或者telent 远程登录到Linux 服务器&#xff0c;经常运行一些需要很长时间才能完成的任务&#xff0c;比如系统备份、ftp 传输等等。通常情况下我们都是为每一个这样的任务开一个远程终端窗口&#xff0c;因为它们执行的时间太长了。必须等待它…

javascript的fn方法(转)

jQuery为开发插件提拱了两个方法&#xff0c;分别是&#xff1a; jQuery.fn.extend(object); jQuery.extend(object); jQuery.extend(object); 为扩展jQuery类本身.为类添加新的方法。 jQuery.fn.extend(object);给jQuery对象添加方法。 fn 是什么东西呢。查看jQuery代码&#…
最新文章