ContinuousSessionContext

持续会话的作用域, 通过此作用域在监听函数监听过程中进行会话嵌套。

waiting 中注册的持续会话监听函数将会在所有监听函数被触发前, 整体性的依次执行。 并且因为 ContinuousSessionSelector.invoke 不存在返回值, 因此所有的临时会话监听函数均无法对任何正常的监听流程产生影响,也无法参与到正常流程中的结果返回中。

在事件处理流程中,包含了持续会话监听函数的情况大概如下所示:

          + ----- +
| Event |
+ ------+

| push


+ -------------- +
| EventProcessor |
+ ---------------+
|
| 构建Context
|
+ ---------------- +
| 持续会话监听函数 | (如果有的话)
+ ---------------- +
|
| (如果没有被使用)
|
+ ---------- +
| 正常处理流程 |
+ ---------- +

一次性

在持续会话中,事件的使用是一次性的。也就是说当一个事件被通过 ContinuousSessionProvider.push 推送后, 将不会被其他持续会话或后续的普通监听函数使用。因为这个事件已经被当前这个持续会话所 取用 了。

仅获取

对于持续会话的使用,你应该尽可能的避免在 ContinuousSessionSelector 中执行逻辑 ———— 你应当在 ContinuousSessionSelector 更多的做选择与获取,而不是做逻辑处理

如下示例:

val value = session.waitingFor(FooEvent) { provider ->
if (... && ... && ...) {
provider.push(...)
}
}

// 逻辑处理
useValue(value)

你应当通过持续会话来获取值,然后在你的监听主流程中进行业务逻辑。 同样的原因,你也不应该在持续会话中执行任何异步任务。

provider & receiver

ContinuousSessionContext 中,不论是 provider 还是 receiver, 它们都会在一次会话结束(使用了能够导致 ContinuousSessionProvider.isCompleted == true 的函数 )后被移除。 因此当会话结束后,不论 provider 还是 receiver 都会变为null。 如果你希望得到这次会话的某个返回值,你需要通过 waiting 挂起等待。

对于较为简单的会话嵌套,(在kotlin中)你可以使用以下方式轻松完成:

suspend fun EventListenersGenerator.fooListener() {
GroupEvent { event: GroupEvent -> // this: EventListenerProcessingContext
// 获取 session context. 正常情况下不可能为null
val sessionContext = context.getAttribute(EventProcessingContext.Scope.ContinuousSession) ?: error("不支持会话!")

// 注册并等待一个持续会话监听函数提供结果. 此处挂起并等待
val num: Int = sessionContext.waiting { provider -> // this: EventProcessingContext
// 这里使用的是 Event.Key 作为判断方式。
// 当然,也可以用 is 判断: if (event is ChannelEvent)
if (event.key isSub ChannelEvent) {
// 假如监听到一个事件为 ChannelEvent 类型, 推送结果.
provider.push(1)
}
}

assert(num == 1)
}
}

不过假如你的逻辑比较复杂,可能在 waiting 中仍需要嵌套更多的其他listener,那么对函数进行拆分也许是个不错的选择。比如:

suspend fun EventListenersGenerator.myListener() {
GroupEvent { event: GroupEvent -> // this: EventListenerProcessingContext
// 获取 session context, 并认为它不为null。
val sessionContext = context.getAttribute(EventProcessingContext.Scope.ContinuousSession) ?: error("不支持会话!")

// 假设你需要获取2个数字,每个数字中同样需要再次等待一个数字。

val num1 = sessionContext.waitingFor(listener = EventProcessingContext::getNum1)
val num2 = sessionContext.waitingFor(listener = ::getNum2)

assert((num1 + num2) == 10) // sum: 10
null
}
}

suspend fun EventProcessingContext.getNum1(provider: ContinuousSessionProvider<Int>): Unit = 4

suspend fun getNum2(context: EventProcessingContext, provider: ContinuousSessionProvider<Int>): Unit = 6

如果你有明确的监听类型目标,你也可以使用 nextXxxwaitingForXxx 的相关函数来简化类型匹配:

超时处理

在 Kotlin 中,你可以使用 withTimeout 来包裹诸如 waiting 或者 next 等这类挂起函数来控制超时时间。

val session: ContinuousSessionContext = ...

// throw Exception if timeout
withTimeout(5.seconds) {
session.waiting { provider -> // this: EventProcessingContext
// ...
}
}

而对于 Java 开发者,绝大多数相关的函数中都提供了 timeouttimeUnit 参数来控制超时时间。

会话清除

当你通过 waiting 或者其他相关函数注册了一个持续会话监听函数之后,在出现以下情况后,他们会被清除:

Author

ForteScarlet

See also

Constructors

Link copied to clipboard

Functions

Link copied to clipboard

尝试通过ID获取一个 provider.

Link copied to clipboard

尝试通过ID获取一个 receiver.

Link copied to clipboard
suspend fun Event.next(id: String = randomIdStr()): Event
suspend fun <E : Event> Event.next(key: Event.Key<E>): E
suspend fun <E : Event> Event.next(id: String, key: Event.Key<E>): E

挂起并等待在具体的事件 Event 环境下根据条件获取下一个匹配的 事件.

suspend fun EventProcessingContext.next(id: String = randomIdStr()): Event
suspend fun <E : Event> EventProcessingContext.next(key: Event.Key<E>): E
suspend fun <E : Event> EventProcessingContext.next(id: String, key: Event.Key<E>): E

挂起并等待在当前的事件处理上下文 EventProcessingContext 中根据条件获取下一个匹配的 事件.

Link copied to clipboard
@JvmName(name = "next")
fun Event.nextBlocking(id: String = randomIdStr(), timeout: JavaDuration = JavaDuration.ZERO): Event
@JvmName(name = "next")
fun <E : Event> Event.nextBlocking(key: Event.Key<E>, timeout: JavaDuration = JavaDuration.ZERO): E
@JvmName(name = "next")
fun Event.nextBlocking(id: String = randomIdStr(), timeout: Long, timeUnit: TimeUnit): Event
@JvmName(name = "next")
fun <E : Event> Event.nextBlocking(id: String, key: Event.Key<E>, timeout: JavaDuration = JavaDuration.ZERO): E
@JvmName(name = "next")
fun <E : Event> Event.nextBlocking(id: String = randomIdStr(), key: Event.Key<E>, timeout: Long, timeUnit: TimeUnit): E

阻塞并等待在具体的事件 Event 环境下根据条件获取下一个匹配的 事件.

@JvmName(name = "next")
fun EventProcessingContext.nextBlocking(id: String = randomIdStr(), timeout: JavaDuration = JavaDuration.ZERO): Event
@JvmName(name = "next")
fun <E : Event> EventProcessingContext.nextBlocking(key: Event.Key<E>, timeout: JavaDuration = JavaDuration.ZERO): E
@JvmName(name = "next")
fun EventProcessingContext.nextBlocking(id: String = randomIdStr(), timeout: Long, timeUnit: TimeUnit): Event
@JvmName(name = "next")
fun <E : Event> EventProcessingContext.nextBlocking(id: String = randomIdStr(), key: Event.Key<E>, timeout: JavaDuration = JavaDuration.ZERO): E
@JvmName(name = "next")
fun <E : Event> EventProcessingContext.nextBlocking(id: String = randomIdStr(), key: Event.Key<E>, timeout: Long, timeUnit: TimeUnit): E

阻塞并等待在具体的事件处理上下文 EventProcessingContext 环境下根据条件获取下一个匹配的 事件.

Link copied to clipboard

挂起并等待符合当前 Event 作用域下的下一个消息事件的 消息内容.

Link copied to clipboard
@JvmName(name = "nextMessage")
fun Event.nextMessageBlocking(id: String = randomIdStr(), timeout: JavaDuration): MessageContent
@JvmName(name = "nextMessage")
fun Event.nextMessageBlocking(key: Event.Key<out MessageEvent>, timeout: JavaDuration = JavaDuration.ZERO): MessageContent
@JvmName(name = "nextMessage")
fun Event.nextMessageBlocking(id: String = randomIdStr(), key: Event.Key<out MessageEvent> = MessageEvent, timeout: JavaDuration = JavaDuration.ZERO): MessageContent
@JvmName(name = "nextMessage")
fun Event.nextMessageBlocking(key: Event.Key<out MessageEvent>, timeout: Long, timeUnit: TimeUnit): MessageContent
@JvmName(name = "nextMessage")
fun Event.nextMessageBlocking(id: String = randomIdStr(), key: Event.Key<out MessageEvent> = MessageEvent, timeout: Long, timeUnit: TimeUnit): MessageContent

阻塞并等待在具体的 Event 作用域下根据条件获取下一个匹配的 消息事件 中的 消息内容

@JvmName(name = "nextMessage")
fun EventProcessingContext.nextMessageBlocking(id: String = randomIdStr(), timeout: JavaDuration): MessageContent
@JvmName(name = "nextMessage")
fun EventProcessingContext.nextMessageBlocking(key: Event.Key<out MessageEvent>, timeout: JavaDuration = JavaDuration.ZERO): MessageContent
@JvmName(name = "nextMessage")
fun EventProcessingContext.nextMessageBlocking(id: String = randomIdStr(), key: Event.Key<out MessageEvent> = MessageEvent, timeout: JavaDuration = JavaDuration.ZERO): MessageContent
@JvmName(name = "nextMessage")
fun EventProcessingContext.nextMessageBlocking(key: Event.Key<out MessageEvent>, timeout: Long, timeUnit: TimeUnit): MessageContent
@JvmName(name = "nextMessage")
fun EventProcessingContext.nextMessageBlocking(id: String = randomIdStr(), key: Event.Key<out MessageEvent> = MessageEvent, timeout: Long, timeUnit: TimeUnit): MessageContent

阻塞并等待在具体的事件处理上下文 EventProcessingContext 环境下根据条件获取下一个匹配的 消息事件 中的 消息内容

Link copied to clipboard
@JvmName(name = "waiting")
fun <T> waitBlocking(timeout: JavaDuration = JavaDuration.ZERO, blockingListener: BlockingContinuousSessionSelector<T>): T
@JvmName(name = "waiting")
fun <T> waitBlocking(id: String, timeout: JavaDuration = JavaDuration.ZERO, blockingListener: BlockingContinuousSessionSelector<T>): T
@JvmName(name = "waiting")
fun <T> waitBlocking(id: String, timeout: Long, timeUnit: TimeUnit, blockingListener: BlockingContinuousSessionSelector<T>): T

注册一个持续会话监听函数并阻塞的等待.

@JvmName(name = "waiting")
fun <T> waitBlocking(timeout: Long, timeUnit: TimeUnit, blockingListener: BlockingContinuousSessionSelector<T>): T

注册一个持续会话监听函数并阻塞的等待, id随机。

Link copied to clipboard
@JvmName(name = "waitingForNext")
fun waitForNextBlocking(timeoutDuration: JavaDuration = JavaDuration.ZERO, matcher: BlockingContinuousSessionEventMatcher<Event> = BlockingContinuousSessionEventMatcher): Event
@JvmName(name = "waitingForNext")
fun waitForNextBlocking(id: String = randomIdStr(), matcher: BlockingContinuousSessionEventMatcher<Event>): Event
@JvmName(name = "waitingForNext")
fun <E : Event> waitForNextBlocking(key: Event.Key<E>, matcher: BlockingContinuousSessionEventMatcher<E>): E
@JvmName(name = "waitingForNext")
fun waitForNextBlocking(timeout: Long, timeUnit: TimeUnit, matcher: BlockingContinuousSessionEventMatcher<Event> = BlockingContinuousSessionEventMatcher): Event
@JvmName(name = "waitingForNext")
fun waitForNextBlocking(id: String, timeoutDuration: JavaDuration = JavaDuration.ZERO, matcher: BlockingContinuousSessionEventMatcher<Event> = BlockingContinuousSessionEventMatcher): Event
@JvmName(name = "waitingForNext")
fun <E : Event> waitForNextBlocking(id: String, key: Event.Key<E>, matcher: BlockingContinuousSessionEventMatcher<E>): E
@JvmName(name = "waitingForNext")
fun <E : Event> waitForNextBlocking(key: Event.Key<E>, timeoutDuration: JavaDuration = JavaDuration.ZERO, matcher: BlockingContinuousSessionEventMatcher<E> = BlockingContinuousSessionEventMatcher): E
@JvmName(name = "waitingForNext")
fun waitForNextBlocking(id: String, timeout: Long, timeUnit: TimeUnit, matcher: BlockingContinuousSessionEventMatcher<Event> = BlockingContinuousSessionEventMatcher): Event
@JvmName(name = "waitingForNext")
fun <E : Event> waitForNextBlocking(id: String, key: Event.Key<E>, timeoutDuration: JavaDuration = JavaDuration.ZERO, matcher: BlockingContinuousSessionEventMatcher<E> = BlockingContinuousSessionEventMatcher): E
@JvmName(name = "waitingForNext")
fun <E : Event> waitForNextBlocking(key: Event.Key<E>, timeout: Long, timeUnit: TimeUnit, matcher: BlockingContinuousSessionEventMatcher<E> = BlockingContinuousSessionEventMatcher): E
@JvmName(name = "waitingForNext")
fun <E : Event> waitForNextBlocking(id: String, key: Event.Key<E>, timeout: Long, timeUnit: TimeUnit, matcher: BlockingContinuousSessionEventMatcher<E> = BlockingContinuousSessionEventMatcher): E

阻塞并等待下一个符合条件的 事件 对象。

Link copied to clipboard
@JvmName(name = "waitingForNextMessage")
fun waitForNextMessageBlocking(timeout: JavaDuration = JavaDuration.ZERO, matcher: BlockingContinuousSessionEventMatcher<MessageEvent> = BlockingContinuousSessionEventMatcher): MessageContent
@JvmName(name = "waitingForNextMessage")
fun waitForNextMessageBlocking(timeout: Long, timeUnit: TimeUnit, matcher: BlockingContinuousSessionEventMatcher<MessageEvent> = BlockingContinuousSessionEventMatcher): MessageContent
@JvmName(name = "waitingForNextMessage")
fun waitForNextMessageBlocking(id: String, timeout: JavaDuration = JavaDuration.ZERO, matcher: BlockingContinuousSessionEventMatcher<MessageEvent> = BlockingContinuousSessionEventMatcher): MessageContent
@JvmName(name = "waitingForNextMessage")
fun <E : MessageEvent> waitForNextMessageBlocking(id: String = randomIdStr(), key: Event.Key<E>, matcher: BlockingContinuousSessionEventMatcher<E>): MessageContent
@JvmName(name = "waitingForNextMessage")
fun <E : MessageEvent> waitForNextMessageBlocking(key: Event.Key<E>, timeout: JavaDuration = JavaDuration.ZERO, matcher: BlockingContinuousSessionEventMatcher<E> = BlockingContinuousSessionEventMatcher): MessageContent
@JvmName(name = "waitingForNextMessage")
fun waitForNextMessageBlocking(id: String, timeout: Long, timeUnit: TimeUnit, matcher: BlockingContinuousSessionEventMatcher<MessageEvent> = BlockingContinuousSessionEventMatcher): MessageContent
@JvmName(name = "waitingForNextMessage")
fun <E : MessageEvent> waitForNextMessageBlocking(id: String, key: Event.Key<E>, timeout: JavaDuration = JavaDuration.ZERO, matcher: BlockingContinuousSessionEventMatcher<E> = BlockingContinuousSessionEventMatcher): MessageContent
@JvmName(name = "waitingForNextMessage")
fun <E : MessageEvent> waitForNextMessageBlocking(key: Event.Key<E>, timeout: Long, timeUnit: TimeUnit, matcher: BlockingContinuousSessionEventMatcher<E> = BlockingContinuousSessionEventMatcher): MessageContent
@JvmName(name = "waitingForNextMessage")
fun <E : MessageEvent> waitForNextMessageBlocking(id: String, key: Event.Key<E>, timeout: Long, timeUnit: TimeUnit, matcher: BlockingContinuousSessionEventMatcher<E> = BlockingContinuousSessionEventMatcher): MessageContent

阻塞并等待下一个符合条件的 消息事件 中的 消息内容

Link copied to clipboard
abstract suspend override fun <T> waiting(id: String, listener: ContinuousSessionSelector<T>): T

注册一个持续会话监听函数并挂起等待. 如果注册时发现存在 id 冲突的持续会话监听函数,则上一个函数将会被立即关闭处理。

Link copied to clipboard
suspend fun waitingForNext(id: String = randomIdStr(), matcher: ContinuousSessionEventMatcher<Event> = ContinuousSessionEventMatcher): Event

挂起并等待下一个符合 条件事件 对象。

suspend fun <E : Event> waitingForNext(id: String = randomIdStr(), key: Event.Key<E>, matcher: ContinuousSessionEventMatcher<E> = ContinuousSessionEventMatcher): E

挂起并等待下一个符合 类型条件事件 对象。

Link copied to clipboard
suspend fun waitingForNextMessage(id: String = randomIdStr(), matcher: ContinuousSessionEventMatcher<MessageEvent> = ContinuousSessionEventMatcher): MessageContent

挂起并等待下一个符合条件的 消息事件 中的 消息内容

suspend fun <E : MessageEvent> waitingForNextMessage(id: String, key: Event.Key<E>, matcher: ContinuousSessionEventMatcher<E> = ContinuousSessionEventMatcher): MessageContent

挂起并等待下一个符合条件的 消息事件 中的消息体。

Extensions

Link copied to clipboard

进入到 ContinuousSessionContext 上下文中。

Link copied to clipboard
suspend fun <E : Event> ContinuousSessionContext.waitingForNext(k: Event.Key<E>, matcher: ContinuousSessionEventMatcher<E> = ContinuousSessionEventMatcher): E

挂起并等待下一个符合 类型条件事件 对象。

Link copied to clipboard

挂起并等待下一个符合条件的 消息事件 中的消息体。