Kotlin 协程实现 Vertx 事务管理简单封装
参考项目链接:https://github.com/AiKrai001/vertx-pj
1. 背景介绍
Vertx 库的 SQL 操作较为复杂,事务管理也比较麻烦。本教程将介绍如何使用 Kotlin 协程上下文来简单封装一套简洁的事务管理方案。
2. 基础框架封装
2.1 定义基础接口
首先参考 ebean 等框架的设计,定义一个基础的 Repository 接口,这里示例就写一个execute方法,可自行实现crud等更多方法:
interface Repository<TId, TEntity> {
suspend fun <R> execute(sql: String): R
}
2.2 实现基础接口
创建 Repository 接口的实现类:
import com.fasterxml.jackson.core.type.TypeReference
import io.vertx.kotlin.coroutines.coAwait
import io.vertx.sqlclient.*
import io.vertx.sqlclient.templates.SqlTemplate
import mu.KotlinLogging
import org.aikrai.vertx.jackson.JsonUtil
import java.lang.reflect.ParameterizedType
open class RepositoryImpl<TId, TEntity : Any>(
private val sqlClient: SqlClient
) : Repository<TId, TEntity> {
private val clazz: Class<TEntity> = (this::class.java.genericSuperclass as ParameterizedType)
.actualTypeArguments[1] as Class<TEntity>
override suspend fun <R> execute(sql: String): R {
return if (sql.trimStart().startsWith("SELECT", true)) {
val list = SqlTemplate.forQuery(getConnection(), sql).execute(mapOf())
.coAwait().map { it.toJson() }
val jsonObject = JsonUtil.toJsonObject(list)
val typeReference = object : TypeReference<R>() {}
JsonUtil.parseObject(jsonObject, typeReference, true)
} else {
val rowCount = SqlTemplate.forUpdate(getConnection(), sql).execute(mapOf())
.coAwait().rowCount()
rowCount as R
}
}
}
说明:
TId
: 映射对象主键的类型TEntity
: 映射对象类型sqlClient
: 用于执行 SQL 的客户端实例execute
: 支持执行自定义 SQL 并返回结果
3. 实际应用示例
3.1 创建用户表相关代码
使用 Google Guice 进行依赖注入:
@ImplementedBy(UserRepositoryImpl::class)
interface UserRepository : Repository<Long, User> {
}
class UserRepositoryImpl @Inject constructor(
sqlClient: SqlClient
) : RepositoryImpl<Long, User>(sqlClient), UserRepository {
}
3.2 配置依赖注入
object InjectConfig {
fun configure(vertx: Vertx, configMap: Map<String, Any>): Injector {
return Guice.createInjector(InjectorModule(vertx, configMap))
}
}
class InjectorModule(
private val vertx: Vertx,
private val configMap: Map<String, Any>
) : AbstractModule() {
override fun configure() {
val pool = getDbPool().also { initTxMgr(it) }
bind(Pool::class.java).toInstance(pool)
bind(SqlClient::class.java).toInstance(pool)
}
private fun getDbPool(): Pool {
val url = configMap["databases.url"].toString()
val poolOptions = PoolOptions().setMaxSize(10)
val pool = MySQLBuilder.pool().connectingTo(url).with(poolOptions).using(vertx).build()
return pool
}
}
3.3 说明
基础代码就编写完成了。下面开始编写事务管理相关代码。
想过三种方法实现:使用自定义注解配合Google Guice注册拦截器实现aop,类似spring事务的实现方式,但是Google Guice不支持携程;协程上下文CoroutineContext;使用类似装饰器方式实现。
4. 事务管理实现
4.1 事务上下文元素
class TxCtxElem(
val sqlConnection: SqlConnection,
val transaction: Transaction,
val isActive: Boolean = true,
val isNested: Boolean = false,
val transactionStack: Stack<TxCtxElem>,
val index: Int = transactionStack.size,
val transactionId: String = UUID.randomUUID().toString()
) : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<TxCtxElem>
override val key: CoroutineContext.Key<*> = Key
}
4.2 事务管理器
核心实现类 TxMgr
:
import io.vertx.kotlin.coroutines.coAwait
import io.vertx.sqlclient.Pool
import io.vertx.sqlclient.SqlConnection
import io.vertx.sqlclient.Transaction
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.withContext
import mu.KotlinLogging
import org.aikrai.vertx.utlis.Meta
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
suspend fun <T> withTransaction(block: suspend CoroutineScope.() -> T): Any? {
return TxMgrHolder.txMgr.withTransaction(block)
}
object TxMgrHolder {
private val _txMgr = AtomicReference<TxMgr?>(null)
val txMgr: TxMgr
get() = _txMgr.get() ?: throw Meta.failure(
"TransactionError",
"TxMgr(TransactionManager) 尚未初始化。请先调用 initTxMgr()。"
)
fun initTxMgr(pool: Pool) {
if (_txMgr.get() != null) return
val newManager = TxMgr(pool)
_txMgr.compareAndSet(null, newManager)
}
}
class TxMgr(
private val pool: Pool
) {
private val logger = KotlinLogging.logger { }
private val transactionStackMap = ConcurrentHashMap<CoroutineContext, Stack<TxCtxElem>>()
suspend fun <T> withTransaction(block: suspend CoroutineScope.() -> T): Any? {
val currentContext = coroutineContext
val transactionStack = currentContext[TxCtxElem]?.transactionStack ?: Stack<TxCtxElem>()
// 外层事务,嵌套事务,都创建新的连接和事务。实现外层事务回滚时所有嵌套事务回滚,嵌套事务回滚不影响外部事务
val connection: SqlConnection = pool.connection.coAwait()
val transaction: Transaction = connection.begin().coAwait()
return try {
val txCtxElem =
TxCtxElem(connection, transaction, true, transactionStack.isNotEmpty(), transactionStack)
transactionStack.push(txCtxElem)
logger.debug { (if (txCtxElem.isNested) "嵌套" else "") + "事务Id:" + txCtxElem.transactionId + "开始" }
withContext(currentContext + txCtxElem) {
val result = block()
if (txCtxElem.index == 0) {
while (transactionStack.isNotEmpty()) {
val txCtx = transactionStack.pop()
txCtx.transaction.commit().coAwait()
logger.debug { (if (txCtx.isNested) "嵌套" else "") + "事务Id:" + txCtx.transactionId + "提交" }
}
}
result
}
} catch (e: Exception) {
logger.error(e) { "Transaction failed, rollback" }
if (transactionStack.isNotEmpty() && !transactionStack.peek().isNested) {
// 外层事务失败,回滚所有事务
logger.error { "Rolling back all transactions" }
while (transactionStack.isNotEmpty()) {
val txCtxElem = transactionStack.pop()
txCtxElem.transaction.rollback().coAwait()
logger.debug { (if (txCtxElem.isNested) "嵌套" else "") + "事务Id:" + txCtxElem.transactionId + "回滚" }
}
throw e
} else {
// 嵌套事务失败,只回滚当前事务
val txCtxElem = transactionStack.pop()
txCtxElem.transaction.rollback().coAwait()
logger.debug(e) { (if (txCtxElem.isNested) "嵌套" else "") + "事务Id:" + txCtxElem.transactionId + "回滚" }
}
} finally {
if (transactionStack.isEmpty()) {
transactionStackMap.remove(currentContext) // 清理上下文
connection.close() // 仅在外层事务时关闭连接
}
}
}
}
实现说明:
- 使用协程上下文管理事务状态
- 支持事务嵌套,采用独立连接策略
- 实现事务提交和回滚的完整生命周期管理
4.3 修改 Repository 实现
open class RepositoryImpl<TId, TEntity : Any>(
private val sqlClient: SqlClient
) : Repository<TId, TEntity> {
private suspend fun getConnection(): SqlClient {
return if (TxCtx.isTransactionActive(coroutineContext)) {
TxCtx.currentSqlConnection(coroutineContext) ?: sqlClient
} else {
sqlClient
}
}
// ... 其他实现代码
}
5. 使用方式
suspend fun testTransaction() {
withTransaction {
val execute1 = userRepository.execute<Int>(
"update sys_user set email = '88888' where user_name = '运若汐'"
)
println("运若汐: $execute1")
throw Meta.failure("test transaction", "test transaction")
}
}
6. 注意事项
- withTransaction中包裹的方法必须使用 RepositoryImpl 中使用 getConnection() 获取连接执行sql的方法。
- 不能在 withTransaction 中开启新的协程上下文
- 事务尽量不要嵌套,目前嵌套处理逻辑是外层事务回滚会导致所有嵌套事务回滚,嵌套事务回滚不影响外层事务,可以自行修改。
- 事务尽量不要嵌套,可能导致性能问题。
- 同级事务会被视为嵌套事务处理。
若有错误多多包涵,可左下角邮箱联系。
License:
CC BY 4.0