1. Subscription接口介绍
Subscription接口是在Java 9中引入的一个新接口,它是在Java 8中的流(Stream)的基础上进行的扩展。Subscription接口是用来表示一个流中的订阅,包含订阅者的信息和订阅的资源。
Subscription接口定义如下:
public interface Subscription {
public void request(long n);
public void cancel();
}
1.1 Subscription接口方法
request(long n):该方法表示订阅者请求接收n个项目,可以多次调用该方法。
cancel():该方法表示取消订阅。
1.2 实现自定义Subscription方式
在Java 9中,可以通过自定义Subscription来实现对流的订阅,示例代码如下:
public class MySubscription implements Subscription{
private final Flow.Subscriber super Integer> subscriber;
private int requested = 0;
private boolean completed = false;
public MySubscription(Flow.Subscriber super Integer> subscriber) {
this.subscriber = subscriber;
}
public void request(long n) {
if (completed) {
return;
}
synchronized (this) {
requested += n;
if (requested < 0) {
requested = Integer.MAX_VALUE;
}
}
drain();
}
public void cancel() {
completed = true;
}
private void drain() {
while (requested > 0) {
if (completed) {
return;
}
subscriber.onNext(nextItem());
requested--;
}
if (completed) {
subscriber.onComplete();
}
}
private Integer nextItem() {
// 返回下一个需要发送的数据
}
}
2. Subscription接口规则
2.1 调用Subscription.request(long n)方法
当订阅者调用Subscription的request(long n)方法来请求接收n个项目时,下游可能会直接发送许多元素,也可能会逐个发送元素。
示例代码如下:
public class MySubscriber implements Flow.Subscriber{
private Subscription subscription;
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
public void onNext(Integer item) {
System.out.println(item);
subscription.request(1);
}
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
public void onComplete() {
System.out.println("Complete!");
}
}
在上面的代码中,订阅者MySubscriber通过Subscription的request(long n)方法请求了1个项目,当流中的元素产生时,会逐个发送元素,并在发送每个元素之后再次调用Subscription的request(long n)方法请求下一个元素。
2.2 调用Subscription.cancel()方法
当订阅者调用Subscription的cancel()方法时,上下游的所有资源都会被释放。订阅者会从下游接收到完成通知,表示流的处理已经结束。
2.3 订阅模式
订阅者和发布者之间的关系是基于订阅模式设计的。在发布-订阅模式中,发布者和订阅者是解耦合的,发布者不需要知道订阅者的数量和身份,订阅者也不需要知道发布者的数量和身份,它们之间通过订阅和发布的消息进行通信。通过订阅模式,可以实现发布者和订阅者之间的解耦合,从而提高系统的灵活性和可维护性。
3. 总结
Subscription接口是Java 9新引入的一个接口,它用于表示一个流中的订阅,包含订阅者和订阅的资源。Subscription接口定义了两个方法:request(long n)和cancel(),用于支持订阅者向上游流发出请求并取消订阅。通过自定义Subscription实现,用户可以灵活地处理流中的元素。在订阅模式下,发布者和订阅者之间通过订阅和发布的消息进行通信,从而实现了解耦合。