范文健康探索娱乐情感热点
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

WebFlux前置知识(四)

  1.Backpressure
  Backpressure 在国内被翻译成背压,这个翻译在网上被很多人吐槽,我觉得大家的吐槽是有道理的,背压单纯从字面上确实看不出来有什么意思。所以松哥这里直接用英文 Backpressure 吧。
  Backpressure 是一种现象:当数据流从上游生产者向下游消费者传输的过程中,上游生产速度大于下游消费速度,导致下游的 Buffer 溢出,这种现象就叫做 Backpressure。
  换句话说,上游生产数据,生产完成后通过管道将数据传到下游,下游消费数据,当下游消费速度小于上游数据生产速度时,数据在管道中积压会对上游形成一个压力,这就是 Backpressure,从这个角度来说,Backpressure 翻译成反压、回压似乎更合理一些。
  Backpressure 会出现在有 Buffer 上限的系统中,当出现 Buffer 溢出的时候,就会有 Backpressure,对于 Backpressure,它的应对措施只有一个:丢弃新事件。那么什么是 Buffer 溢出呢?例如我的服务器可以同时处理 2000 个用户请求,那么我就把请求上限设置为 2000,这个 2000 就是我的 Buffer,当超出 2000 的时候,就产生了 Backpressure。  2.Flow API
  JDK9 中推出了 Flow API,用以支持 Reactive Programming,即响应式编程。
  在响应式编程中,会有一个数据发布者 Publisher 和数据订阅者 Subscriber,Subscriber 接收 Publisher 发布的数据并进行消费,在 Subscriber 和 Publisher 之间还存在一个 Processor,类似于一个过滤器,可以对数据进行中间处理。
  JDK9 中提供了 Flow API 用以支持响应式编程,另外 RxJava 和 Reactor 等框架也提供了相关的实现。
  我们来看看 JDK9 中的 Flow 类:
  非常简洁,基本上就是按照 Reactive Programming 的设计来的:
  Publisher
  Publisher 为数据发布者,这是一个函数式接口,里边只有一个方法,通过这个方法将数据发布出去,Publisher 的定义如下:  @FunctionalInterface public static interface Publisher {     public void subscribe(Subscriber<? super T> subscriber); }
  Subscriber
  Subscriber 为数据订阅者,这个里边有四个方法,如下:  public static interface Subscriber {     public void onSubscribe(Subscription subscription);     public void onNext(T item);     public void onError(Throwable throwable);     public void onComplete(); } onSubscribe:这个是订阅成功的回调方法,用于初始化 Subscription,并且表明可以开始接收订阅数据了。  onNext:接收下一项订阅数据的回调方法。  onError:在 Publisher 或 Subcriber 遇到不可恢复的错误时调用此方法,之后 Subscription 不会再调用 Subscriber 其他的方法。  onComplete:当接收完所有订阅数据,并且发布者已经关闭后会回调这个方法。
  Subscription
  Subscription 为发布者和订阅者之间的订阅关系,用来控制消息的消费,这个里边有两个方法:  public static interface Subscription {     public void request(long n);     public void cancel(); } request:这个方法用来向数据发布者请求 n 个数据。  cancel:取消消息订阅,订阅者将不再接收数据。
  Processor
  Processor 是一个空接口,不过它同时继承了 Publisher 和 Subscriber,所以它既能发布数据也能订阅数据,因此我们可以通过 Processor 来完成一些数据转换的功能,先接收数据进行处理,处理完成后再将数据发布出去,这个也有点类似于我们 JavaEE 中的过滤器。  public static interface Processor extends Subscriber, Publisher { } 2.1 消息订阅初体验
  我们通过如下一段代码体验一下消息的订阅与发布:  public class FlowDemo {     public static void main(String[] args) {         SubmissionPublisher publisher = new SubmissionPublisher<>();         Flow.Subscriber subscriber = new Flow.Subscriber() {             private Flow.Subscription subscription;             @Override             public void onSubscribe(Flow.Subscription subscription) {                 this.subscription = subscription;                 //向数据发布者请求一个数据                 this.subscription.request(1);             }             @Override             public void onNext(String item) {                 System.out.println("接收到 publisher 发来的消息了:" + item);                 //接收完成后,可以继续接收或者不接收                 //this.subscription.cancel();                 this.subscription.request(1);             }             @Override             public void onError(Throwable throwable) {                 //出现异常,就会来到这个方法,此时直接取消订阅即可                 this.subscription.cancel();             }             @Override             public void onComplete() {                 //发布者的所有数据都被接收,并且发布者已经关闭                 System.out.println("数据接收完毕");             }         };         //配置发布者和订阅者         publisher.subscribe(subscriber);         for (int i = 0; i < 5; i++) {             //发送数据             publisher.submit("hello:" + i);         }         //关闭发布者         publisher.close();         new Scanner(System.in).next();     } }
  松哥稍微解释一下上面这段代码:  首先创建一个 SubmissionPublisher 对象作为消息发布者。  接下来创建 Flow.Subscriber 对象作为消息订阅者,实现消息订阅者里边的四个方法,分别进行处理。  为 publisher 配置上 subscriber。  发送消息。  消息发送完成后关闭 publisher。  最后是让程序不要停止,观察消息订阅者打印情况。  2.2 模拟 Backpressure
  Backpressure 问题在 Flow API 中得到了很好的解决。Subscriber 会将 Publisher 发布的数据缓存在 Subscription 中,其长度默认为256,相关源码如下:  public final class Flow {     static final int DEFAULT_BUFFER_SIZE = 256;     public static int defaultBufferSize() {         return DEFAULT_BUFFER_SIZE;     }     ... }
  一旦超出这个数据量,publisher 就会降低数据发送速度。
  我们对上面的案例进行修改,如下:  public class FlowDemo {     public static void main(String[] args) {         SubmissionPublisher publisher = new SubmissionPublisher<>();          Flow.Subscriber subscriber = new Flow.Subscriber() {             private Flow.Subscription subscription;              @Override             public void onSubscribe(Flow.Subscription subscription) {                 this.subscription = subscription;                 //向数据发布者请求一个数据                 this.subscription.request(1);             }              @Override             public void onNext(String item) {                 System.out.println("接收到 publisher 发来的消息了:" + item);                 //接收完成后,可以继续接收或者不接收                 //this.subscription.cancel();                 try {                     Thread.sleep(2000);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 this.subscription.request(1);             }              @Override             public void onError(Throwable throwable) {                 //出现异常,就会来到这个方法,此时直接取消订阅即可                 this.subscription.cancel();             }              @Override             public void onComplete() {                 //发布者的所有数据都被接收,并且发布者已经关闭                 System.out.println("数据接收完毕");             }         };         publisher.subscribe(subscriber);         for (int i = 0; i < 500; i++) {             System.out.println("i--------->" + i);             publisher.submit("hello:" + i);         }         //关闭发布者         publisher.close();         new Scanner(System.in).next();     } }
  一共修改了三个地方:  Subscriber#onNext 方法中,每次休息两秒再处理下一条数据。  发布数据时,一共发布 500 条数据。  打印数据发布的日志。
  修改完成后,我们再次启动项目,观察控制台输出:
  可以看到,生产者先是一股脑生产了 257 条数据(hello0 在一开始就被消费了,所以缓存中实际上是 256 条),消息则是一条一条的来,由于消费的速度比较慢,所以当缓存中的数据超过 256 条之后,接下来都是消费一条,再发送一条。  2.3 数据处理
  Flow.Processor  可以像过滤器一样,对数据进行预处理,数据从 publisher 出来之后,先进入 Flow.Processor  中进行预处理,然后再进入 Subscriber。
  修改后的代码如下:  public class FlowDemo {     public static void main(String[] args) {          class DataFilter extends SubmissionPublisher implements Flow.Processor{              private Flow.Subscription subscription;              @Override             public void onSubscribe(Flow.Subscription subscription) {                 this.subscription = subscription;                 this.subscription.request(1);             }              @Override             public void onNext(String item) {                 this.submit("【这是一条被处理过的数据】" + item);                 this.subscription.request(1);             }              @Override             public void onError(Throwable throwable) {                 this.subscription.cancel();             }              @Override             public void onComplete() {                 this.close();             }         }          SubmissionPublisher publisher = new SubmissionPublisher<>();         DataFilter dataFilter = new DataFilter();         publisher.subscribe(dataFilter);          Flow.Subscriber subscriber = new Flow.Subscriber() {             private Flow.Subscription subscription;              @Override             public void onSubscribe(Flow.Subscription subscription) {                 this.subscription = subscription;                 //向数据发布者请求一个数据                 this.subscription.request(1);             }              @Override             public void onNext(String item) {                 System.out.println("接收到 publisher 发来的消息了:" + item);                 //接收完成后,可以继续接收或者不接收                 //this.subscription.cancel();                 try {                     Thread.sleep(2000);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 this.subscription.request(1);             }              @Override             public void onError(Throwable throwable) {                 //出现异常,就会来到这个方法,此时直接取消订阅即可                 this.subscription.cancel();             }              @Override             public void onComplete() {                 //发布者的所有数据都被接收,并且发布者已经关闭                 System.out.println("数据接收完毕");             }         };         dataFilter.subscribe(subscriber);         for (int i = 0; i < 500; i++) {             System.out.println("发送消息 i--------->" + i);             publisher.submit("hello:" + i);         }         //关闭发布者         publisher.close();         new Scanner(System.in).next();     } }
  简单起见,我这里创建了一个局部内部类 DataFilter,DataFilter 继承自 SubmissionPublisher 并实现了 Flow.Processor 接口,由于 DataFilter 继承自 SubmissionPublisher,所以它也兼具 SubmissionPublisher 的功能。
  在 DataFilter 中完成消息的处理并重新发送出去。接下来定义 publisher,让 dataFilter 作为其订阅者,再定义新的订阅者,作为 dataFilter 的订阅者。
  最终运行效果如下:
  3.小结
  好啦,这就是今天和大家介绍的 Java9 中的 Reactive Stream,那么至此,我们的 WebFlux 前置知识差不多告一段落了,下篇文章开始,正式开整 WebFlux。
  转载自:https://mp.weixin.qq.com/s/BfgQ760h_WeUOBRrgx1ubA

传奇怒火一刀最新打金传奇怒火一刀,快来下载玩玩吧!打金传奇怒火一刀再现经典,别再犹疑,来下载试试吧!打金传奇怒火一刀是一款经典复古的传奇对战的手游,玩家在打金传奇怒火一刀可以体验到真实风趣的玩iOS12版本闪退修复说明亲爱的旅者在3月10号更新版本后,我们关注到有部分旅者反馈iOS12版本进入游戏有概率会闪退,影响游戏正常体验的问题,对此我们进行了优化和适配。遇到该问题的旅者,可以在3月31日1王者荣耀已经进入老龄化,最新数据显示,最强王者烂大街王者荣耀作为腾讯的王牌手游,这些年在天美的不断更新与优化下,已经算是moba手游领域的行业标杆。发展至今王者荣耀已经开启了s26赛季,同时也出现了问题。经常玩这款游戏的网友应该也发王者荣耀澜传说级皮肤曝光,机甲风格巨帅,宫本新形象公布王者荣耀上线六年的时间,峡谷中皮肤数量多达400多款,一款高质量的皮肤不但可以提升玩家的手感,同时还可以增加局内的视觉效果,甚至有些皮肤使用起来就像是换了一位英雄,因为天美上线的新玩游戏如何赚米游戏这玩意,能赚钱吗?实不相瞒,游戏确实很赚钱。那普通人能做吗?当然能,你所看到的很多项目都是普通人操作的,可能只是很多人不知道如何入手罢了。那今天我呢就跟大家简单讲解一下,新手到庆余年手游职业推荐哪个职业厉害适合平民庆余年手游中,根据武器类型可划分为五种职业,分别是剑双剑双刃棍和琴,许多玩家开局也是不知道选什么职业好,本次小编就带来了职业的选择建议,快来了解一番吧!庆余年手游职业选择哪个好?首游戏推广电话销售之套路游戏推广电销套路本方法按照小组的形式组成与配合,一般一个小组6人组成。5个推广员(打电话人员),1个团长(带游戏,引导消费)必须懂游戏,脑子灵活,有组织能力需要准备的硬件设施,打电NAVICEO我们希望保留CS阵容,并希望三人能从俄罗斯搬离乌克兰俱乐部NAVI的首席执行官YevhenZolotarov在近期接受了美国华盛顿邮报的采访,在采访中他谈到了战争对俱乐部的影响,以及NAVI旗下CSGO团队的未来。其中Yevh吃鸡新联动机甲套装,3。27更新后全服免费送,光子太良心了大家好,欢迎来到由小鱼干开讲的吃鸡新鲜事速报,这段时间玩家们的注意力,基本上都被体验服吸引住了,这天体验服也是进行了第4轮更新。不过更新了两百多M的内容,小鱼干进入看了半天,却都没大魔神3D传奇元宝最合理使用途径介绍元宝怎么用性价比最高大魔神3D传奇手游中元宝这个资源不管是在什么阶段都是非常重要的了,不过我们能够获得的途径还是相当有限的,所以说每个元宝我们都是需要好好的进行利用,这样才能够有更高的性价比,如果是随任天堂推迟发布塞尔达传说荒野之息续作,股价一度跌超6记者彭新编辑在于3月29日晚宣布推迟最新塞尔达传说游戏后,日本游戏公司任天堂股价今日大幅下跌,一度跌超6。任天堂称,计划2023年春季发布热门动作冒险游戏塞尔达传说荒野之息(The
科幻开放世界手游幻塔开启预下载12月16日全平台公测IT之家12月14日消息,据幻塔手游官方微博,轻科幻开放世界手游幻塔在今日1800开启了预下载,拓荒者下载安装游戏,可以提前享受捏脸的乐趣,还可以分享自己的形象。幻塔官网官方还公布大话西游2单开玩家突然想回来玩!时代变了,过去的标准不管用好久不玩大话的老玩家,偶然间刷到大话2的故事和视频,突然又想玩。你百度了一下哪个区人多,别人说4上去傲雪,里面都是克火的,好混队。你认认真真的去电脑版藏宝阁开始看号,你想着,我就回双城之战海克斯开局DD街区等于吃分最近小编沉迷于打双城之战无法自拔,最近呢也是冲上了钻石局这个分段,开始了彻底地放飞自我。在最近的一场排位赛中,开局就遇到了DD街区,最终呢也是拿下了第一名。在此小编先奉上我的吃鸡图燃烧意志本次先锋2。0,新增爬塔玩法,血战进一步减负诸君安好,雾夏菌报道。14号的中午游戏公布了本次先锋服2。0的大概内容,但有些具体的细节没提到,此外还有很多玩家最关心的正式服啥时候上线2。0?那这一回合就来看一下。1新增爬塔玩法五花八门的BUG也能成为游戏独有的特色玩游戏的时候大家都不想遇到BUG,因为它破坏游戏体验呀!试想一下,当你沉浸在游戏世界之中,却突然遭遇了角色穿模贴图崩坏这种状况,太不爽了吧!下面陪玩菌就给大家盘点一下游戏中让人哭笑停不下来!盘点8款今年发售的超耐玩游戏,几百小时都打不住文卡扇人2021年就快要过去,尽管在疫情的冲击之下,全球游戏业界在这一年之内的状况实在谈不上有多好,很多作品的开发受到了阻碍,但好在我们依然能够找出不少值得我们深入游玩的优秀游戏,阴阳师晴明新皮肤即将上线待升的朝阳透出微弱的光亮,空谷中,回荡着呦呦鹤鸣声。男子翩翩身影自未散尽的晨雾中显现,虽身处隐世,外界事物亦在其掌控中。晴明皮肤玉将无双乌帽下,如瀑银丝轻扬的俊逸男子嘴角微微上挑,农村记忆(8090后小时候的游戏)弹珠我想全国的8090后都玩过吧。玩法很多。最直接的赢弹珠的的方式就是画个圈,大家把同样数量的弹珠放在圈里。用主弹珠撞击圈里的弹珠,撞出圈的弹珠就是自己的。撞出圈可以连击,失误了就腾讯确认游戏开发大神毛星云意外身故从自身方面引发的思考看到这一则新闻,觉得非常痛心。大神的意外身故从自身方面引发我的一些思考。常见的不允许有父母对孩子不优秀的不允许,自己对自己不优秀的不允许。这会进入我们的潜意识,从而给我们很大的压力狂暴传奇会员攻略你知道嘛上到这些会员后有什么好处你知道嘛上观众朋友们大家好,今天和大家讲解一下本游戏前期核心必备提升项目会员等级首先我们要知道本产品人人都可以免费拿到永恒会员,而会员等级的提高可以带来最直观的刷图收益,如何在前期快速升级会王者荣耀后羿攻略,轻松学会上分后羿的定位在ADC当中,后羿之所以用的比较多,主要是技能里控制比较多,大招的眩晕,包括右手左边第一个技能的减速,都可以控制敌人,拥有控制的ADC,用起来会很爽的。携带的召唤师技能因