范文健康探索娱乐情感热点
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

WebFlux前置知识(四)

  1.Backpressure
  Backpressure 在国内被翻译成背压,这个翻译在网上被很多人吐槽,我觉得大家的吐槽是有道理的,背压单纯从字面上确实看不出来有什么意思。所以松哥这里直接用英文 Backpressure 吧。
  Backpressure 是一种现象:当数据流从上游生产者向下游消费者传输的过程中,上游生产速度大于下游消费速度,导致下游的 Buffer 溢出,这种现象就叫做 Backpressure。
  换句话说,上游生产数据,生产完成后通过管道将数据传到下游,下游消费数据,当下游消费速度小于上游数据生产速度时,数据在管道中积压会对上游形成一个压力,这就是 Backpressure,从这个角度来说,Backpressure 翻译成反压、回压似乎更合理一些。
  Backpressure 会出现在有 Buffer 上限的系统中,当出现 Buffer 溢出的时候,就会有 Backpressure,对于 Backpressure,它的应对措施只有一个:丢弃新事件。那么什么是 Buffer 溢出呢?例如我的服务器可以同时处理 2000 个用户请求,那么我就把请求上限设置为 2000,这个 2000 就是我的 Buffer,当超出 2000 的时候,就产生了 Backpressure。  2.Flow API
  JDK9 中推出了 Flow API,用以支持 Reactive Programming,即响应式编程。
  在响应式编程中,会有一个数据发布者 Publisher 和数据订阅者 Subscriber,Subscriber 接收 Publisher 发布的数据并进行消费,在 Subscriber 和 Publisher 之间还存在一个 Processor,类似于一个过滤器,可以对数据进行中间处理。
  JDK9 中提供了 Flow API 用以支持响应式编程,另外 RxJava 和 Reactor 等框架也提供了相关的实现。
  我们来看看 JDK9 中的 Flow 类:
  非常简洁,基本上就是按照 Reactive Programming 的设计来的:
  Publisher
  Publisher 为数据发布者,这是一个函数式接口,里边只有一个方法,通过这个方法将数据发布出去,Publisher 的定义如下:  @FunctionalInterface public static interface Publisher {     public void subscribe(Subscriber<? super T> subscriber); }
  Subscriber
  Subscriber 为数据订阅者,这个里边有四个方法,如下:  public static interface Subscriber {     public void onSubscribe(Subscription subscription);     public void onNext(T item);     public void onError(Throwable throwable);     public void onComplete(); } onSubscribe:这个是订阅成功的回调方法,用于初始化 Subscription,并且表明可以开始接收订阅数据了。  onNext:接收下一项订阅数据的回调方法。  onError:在 Publisher 或 Subcriber 遇到不可恢复的错误时调用此方法,之后 Subscription 不会再调用 Subscriber 其他的方法。  onComplete:当接收完所有订阅数据,并且发布者已经关闭后会回调这个方法。
  Subscription
  Subscription 为发布者和订阅者之间的订阅关系,用来控制消息的消费,这个里边有两个方法:  public static interface Subscription {     public void request(long n);     public void cancel(); } request:这个方法用来向数据发布者请求 n 个数据。  cancel:取消消息订阅,订阅者将不再接收数据。
  Processor
  Processor 是一个空接口,不过它同时继承了 Publisher 和 Subscriber,所以它既能发布数据也能订阅数据,因此我们可以通过 Processor 来完成一些数据转换的功能,先接收数据进行处理,处理完成后再将数据发布出去,这个也有点类似于我们 JavaEE 中的过滤器。  public static interface Processor extends Subscriber, Publisher { } 2.1 消息订阅初体验
  我们通过如下一段代码体验一下消息的订阅与发布:  public class FlowDemo {     public static void main(String[] args) {         SubmissionPublisher publisher = new SubmissionPublisher<>();         Flow.Subscriber subscriber = new Flow.Subscriber() {             private Flow.Subscription subscription;             @Override             public void onSubscribe(Flow.Subscription subscription) {                 this.subscription = subscription;                 //向数据发布者请求一个数据                 this.subscription.request(1);             }             @Override             public void onNext(String item) {                 System.out.println("接收到 publisher 发来的消息了:" + item);                 //接收完成后,可以继续接收或者不接收                 //this.subscription.cancel();                 this.subscription.request(1);             }             @Override             public void onError(Throwable throwable) {                 //出现异常,就会来到这个方法,此时直接取消订阅即可                 this.subscription.cancel();             }             @Override             public void onComplete() {                 //发布者的所有数据都被接收,并且发布者已经关闭                 System.out.println("数据接收完毕");             }         };         //配置发布者和订阅者         publisher.subscribe(subscriber);         for (int i = 0; i < 5; i++) {             //发送数据             publisher.submit("hello:" + i);         }         //关闭发布者         publisher.close();         new Scanner(System.in).next();     } }
  松哥稍微解释一下上面这段代码:  首先创建一个 SubmissionPublisher 对象作为消息发布者。  接下来创建 Flow.Subscriber 对象作为消息订阅者,实现消息订阅者里边的四个方法,分别进行处理。  为 publisher 配置上 subscriber。  发送消息。  消息发送完成后关闭 publisher。  最后是让程序不要停止,观察消息订阅者打印情况。  2.2 模拟 Backpressure
  Backpressure 问题在 Flow API 中得到了很好的解决。Subscriber 会将 Publisher 发布的数据缓存在 Subscription 中,其长度默认为256,相关源码如下:  public final class Flow {     static final int DEFAULT_BUFFER_SIZE = 256;     public static int defaultBufferSize() {         return DEFAULT_BUFFER_SIZE;     }     ... }
  一旦超出这个数据量,publisher 就会降低数据发送速度。
  我们对上面的案例进行修改,如下:  public class FlowDemo {     public static void main(String[] args) {         SubmissionPublisher publisher = new SubmissionPublisher<>();          Flow.Subscriber subscriber = new Flow.Subscriber() {             private Flow.Subscription subscription;              @Override             public void onSubscribe(Flow.Subscription subscription) {                 this.subscription = subscription;                 //向数据发布者请求一个数据                 this.subscription.request(1);             }              @Override             public void onNext(String item) {                 System.out.println("接收到 publisher 发来的消息了:" + item);                 //接收完成后,可以继续接收或者不接收                 //this.subscription.cancel();                 try {                     Thread.sleep(2000);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 this.subscription.request(1);             }              @Override             public void onError(Throwable throwable) {                 //出现异常,就会来到这个方法,此时直接取消订阅即可                 this.subscription.cancel();             }              @Override             public void onComplete() {                 //发布者的所有数据都被接收,并且发布者已经关闭                 System.out.println("数据接收完毕");             }         };         publisher.subscribe(subscriber);         for (int i = 0; i < 500; i++) {             System.out.println("i--------->" + i);             publisher.submit("hello:" + i);         }         //关闭发布者         publisher.close();         new Scanner(System.in).next();     } }
  一共修改了三个地方:  Subscriber#onNext 方法中,每次休息两秒再处理下一条数据。  发布数据时,一共发布 500 条数据。  打印数据发布的日志。
  修改完成后,我们再次启动项目,观察控制台输出:
  可以看到,生产者先是一股脑生产了 257 条数据(hello0 在一开始就被消费了,所以缓存中实际上是 256 条),消息则是一条一条的来,由于消费的速度比较慢,所以当缓存中的数据超过 256 条之后,接下来都是消费一条,再发送一条。  2.3 数据处理
  Flow.Processor  可以像过滤器一样,对数据进行预处理,数据从 publisher 出来之后,先进入 Flow.Processor  中进行预处理,然后再进入 Subscriber。
  修改后的代码如下:  public class FlowDemo {     public static void main(String[] args) {          class DataFilter extends SubmissionPublisher implements Flow.Processor{              private Flow.Subscription subscription;              @Override             public void onSubscribe(Flow.Subscription subscription) {                 this.subscription = subscription;                 this.subscription.request(1);             }              @Override             public void onNext(String item) {                 this.submit("【这是一条被处理过的数据】" + item);                 this.subscription.request(1);             }              @Override             public void onError(Throwable throwable) {                 this.subscription.cancel();             }              @Override             public void onComplete() {                 this.close();             }         }          SubmissionPublisher publisher = new SubmissionPublisher<>();         DataFilter dataFilter = new DataFilter();         publisher.subscribe(dataFilter);          Flow.Subscriber subscriber = new Flow.Subscriber() {             private Flow.Subscription subscription;              @Override             public void onSubscribe(Flow.Subscription subscription) {                 this.subscription = subscription;                 //向数据发布者请求一个数据                 this.subscription.request(1);             }              @Override             public void onNext(String item) {                 System.out.println("接收到 publisher 发来的消息了:" + item);                 //接收完成后,可以继续接收或者不接收                 //this.subscription.cancel();                 try {                     Thread.sleep(2000);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 this.subscription.request(1);             }              @Override             public void onError(Throwable throwable) {                 //出现异常,就会来到这个方法,此时直接取消订阅即可                 this.subscription.cancel();             }              @Override             public void onComplete() {                 //发布者的所有数据都被接收,并且发布者已经关闭                 System.out.println("数据接收完毕");             }         };         dataFilter.subscribe(subscriber);         for (int i = 0; i < 500; i++) {             System.out.println("发送消息 i--------->" + i);             publisher.submit("hello:" + i);         }         //关闭发布者         publisher.close();         new Scanner(System.in).next();     } }
  简单起见,我这里创建了一个局部内部类 DataFilter,DataFilter 继承自 SubmissionPublisher 并实现了 Flow.Processor 接口,由于 DataFilter 继承自 SubmissionPublisher,所以它也兼具 SubmissionPublisher 的功能。
  在 DataFilter 中完成消息的处理并重新发送出去。接下来定义 publisher,让 dataFilter 作为其订阅者,再定义新的订阅者,作为 dataFilter 的订阅者。
  最终运行效果如下:
  3.小结
  好啦,这就是今天和大家介绍的 Java9 中的 Reactive Stream,那么至此,我们的 WebFlux 前置知识差不多告一段落了,下篇文章开始,正式开整 WebFlux。
  转载自:https://mp.weixin.qq.com/s/BfgQ760h_WeUOBRrgx1ubA

为票房拼了?毒液2导演宣布毒液出柜,观众能买账吗毒液出柜火速登上热搜,毒液2导演公开表示毒液2是一场关于爱情的电影,两位主角自然是毒液和埃迪。毒液2已拍摄完毕,将于今年正式上映,可能是为了宣传毒液2,导演安迪瑟金斯还表示电影里会听到完结消息感到不舍的漫画,除了一部被腰斩之外,其他都是经典总有那么一部分漫画,我们既想快点看到它的结局,又不想感受到漫画连载完结的失落。岛国网友评选出漫画完结之后让人感到不舍的漫画作品,除了一部漫画因为意外原因被腰斩完结之外,其他的漫画都探游内外绚丽太古星河战法变身普希娅三觉拆解一览为什么战法和四姨的变身不能像四叔一样可以全程保持,因为太漂亮也是一种罪过所以不能全程,通过展示素材与动作,诸位看完后想必也是同样想法。PS由于素材分辨率很高(1500),部分素材不探游内外血气拉满血法师未和谐三觉展示一览吸血鬼的魅力无需在这多说什么,当初DNF首度加入了吸血鬼题材,曾引起了一阵热潮,也算补齐了职业特色,在此不过多叙述,直接进入主题(应网友提议布局略作改动与简化,不知效果如何权当一试探游报告考验耐心之作灵魂潮汐评测报告适用0。41版灵魂潮汐已经上线了一段时间,各方评测与评分都有很多,普遍观点是找到了久违的探索感,游戏中设计的关卡解谜,相比时下的手游设计,确实有趣很多,只不过习惯了自动寻路自动战斗后,一段时间内熊出没中出现的5对双胞胎,不出意外你应该全都知道!在熊出没这部经典动画中,一共出现了5对双胞胎,来看看你都认识哪些吧1。熊大和熊二作为本作的男主和男二,这一对双胞胎大家肯定都非常熟悉了,就算是不看熊出没的大人也都知道他们俩的存在。你没看过的猫和老鼠,这一集汤姆恩将仇报,把杰瑞害惨了今天再给大家讲一讲吉恩戴奇版本的猫和老鼠,我找了好久终于把吉恩戴奇的猫和老鼠找全了,而这一集大部分人肯定都没有看过,话不多说,一起来看看吧!这天晚上飘着鹅毛大雪,城市中盖上了一层白哈弗H6混动版什么时候会出来?哈弗H6出来好多年了,销量好的出奇!一直是SUV销量冠军,这些年来,唯一二次不是销量冠军的原因,就是给宝骏510。哈弗H6把销量冠军都做腻了!有点中年油腻大叔的味道了!不过,哈弗H上海特斯拉超级充电桩工厂,终于建成了都说磨刀不误砍柴功!特斯拉要卖车,都是送了个随身充。不对!是一个家用充电桩。这是好东西啊!拿回家,找个电工来,装上,就可以在家里充电了。不过,很多人不是住别墅的,家里院子不够大,装超级充电技术的发展会不会把蔚来汽车淘汰?什么是快充技术?一般来说,充电功率在350KW或以上,以单枪方式供给动力电池传导充电,并在10到15分钟内充满8090的电量技术,就可以算是快充技术了。当然,随着技术的发展,快充技为什么说特斯拉的地位,暂时无人可以撼动?特斯拉是电动汽车公认的一哥,这一情况,在未来很长一段时间内,都是无法改变的。宏光MINI电动汽车有超过特斯拉的销量,比亚迪也有超过特斯拉的销量。不过,客观地说,它们目前为止,都没有
FPX成了赛制改革的牺牲品?一万经济被翻盘EDG夺夏冠,小天背大锅万众瞩目的LPL夏季决赛终于如期开打,所有LPL观众都在期待FPX和EDG的这次交手。作为夏季赛整体表现最好的两支队伍,FPX和EDG的纸面实力差距不大,关键就看哪支队伍在决赛能够WE不敌FPX掉入败者组!名宿微笑赛后承认失利输得心服口服作为LPL曾经的豪强战队,WE在这个夏天重新崛起挺进4强。随着呼吸哥的加入WE整体实力有显著提升,特别是中单香克斯在季后赛成长迅速,一度有成为国产第一中单的趋势。因此虽然他们即将挑你认为王者荣耀微信区和Q区有什么区别?1QQ区喷子多,微信区高素质玩家多关于这一点可以从QQ区与微信区的年龄分层来讨论。玩QQ区的年龄都比较小,一般都在30岁以下,而微信区的年龄与QQ区相比较,年龄层偏大,更多的是25王者荣耀狄仁杰增强后最新出装,大招输出爆炸,可秒杀万血张飞狄仁杰在S13赛季算是火了,从赛季初就开始不停的在增强,相对比同为射手的鲁班和后羿来说,不知道要比他们强横多少倍。看我这么说,可能会有鲁班和后羿的铁粉不服,认为狄仁杰不可能比他们强RNG为什么说是好签?S11世界赛抽签结果出炉,阿布LPL保底8强由于多种原因,S11世界赛的举办地由国内改为了冰岛。虽然是临时决定改动举办地,但拳头还是迅速解决好了一切问题,各大战队最近都在着手准备前往冰岛备战。经过这段时间的等待后,我们也终于老夫子无敌出装,可以打爆典韦,抓死关羽,团战单挑都无敌!老夫子是王者荣耀之中非常强势的战士英雄,他的单挑带线能力非常强大,如果运用的好,完全可以在逆风局之中引领大家走向胜利。当然,这也是和他的出装有着直接的关系,如果出装不正确,那么老夫最强王者上去了,那该如何冲刺荣耀王者?心态问题。对于挑选英雄很多人有偏见,不得不说确实是有强势之分不过以下的人你很难上王者。阵容搭配。好的阵容才能打天下,进游戏尽量不要秒选,可以先考虑队友选什么,然后在自己的手的英雄中S11开门红!LNG击溃韩华拿首胜,阿乐武器大师一战打崩国内论坛大家都知道,由于各种原因,本次S11世界赛的举办地由国内改成了冰岛。而经过一段时间的等待后,我们终于迎来了S11世界赛的首日比赛。作为全球范围内最强的两个赛区,LCK和LPL谁的实王者荣耀江湖失传已久的吕布最强出装,秒杀万血张飞只需要三刀吕布在王者荣耀之中一位强大的英雄,虽然在坦克的行列,但是要论输出能力没有人能比上他。因为吕布的真实伤害非常爆炸,而且还能暴击,到了后期一刀3000血都是常规操作,所以哪怕是万血张飞S11揭幕战FPX完败DK!5位世界冠军被打崩,玩家BP看不起谁?经过一段时间的等候后,我们终于迎来了S11世界赛的小组赛。作为本次世界赛的揭幕战,FPX和DK这两支世界冠军队伍的较量吸引了许多玩家的目光。这场比赛不仅是9位世界冠军之间的对抗,还王者荣耀什么玩家不配赢?心态不好的,挑衅队友,搞队友心态的玩家。游戏没开始时问人家会不会,让人家发战绩,他如果是边路打野你到底会不会玩?开始给其它玩家分配职业,中路你怎么不来支援?射手你怎么不推塔?辅助你