Skip to content

Commit edf3079

Browse files
author
韩书航
committed
Supports fully asynchronous Engine initialization
1 parent 9383500 commit edf3079

File tree

3 files changed

+33
-12
lines changed

3 files changed

+33
-12
lines changed

kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ class ExecuteStatement(
7676

7777
private def waitStatementComplete(): Unit =
7878
try {
79+
executeStatement()
7980
setState(OperationState.RUNNING)
8081
var statusResp: TGetOperationStatusResp = null
8182

@@ -147,7 +148,18 @@ class ExecuteStatement(
147148
}
148149
// see if anymore log could be fetched
149150
fetchQueryLog()
150-
} catch onError()
151+
} catch onError().andThen(_ => {
152+
if (isClosedOrCanceled && _remoteOpHandle != null) {
153+
// close remote operation in case user cancels before engine launched.
154+
// In this case, CANCELED/CLOSED -> RUNNING transition will definitely throw Exception.
155+
client.cancelOperation(_remoteOpHandle)
156+
}
157+
if (getSession.sessionManager.getSessionOption(getSession.handle).isEmpty) {
158+
// close remote session in case user closes session before engine launched
159+
logger.warn(s"Session ${getSession.handle} has been closed, close remote session now")
160+
client.closeSession()
161+
}
162+
})
151163
finally {
152164
shutdownTimeoutMonitor()
153165
}
@@ -168,7 +180,6 @@ class ExecuteStatement(
168180
if (isTimeoutMonitorEnabled) {
169181
addTimeoutMonitor(queryTimeout)
170182
}
171-
executeStatement()
172183
val sessionManager = session.sessionManager
173184
val asyncOperation: Runnable = () => waitStatementComplete()
174185
try {

kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
172172
}
173173

174174
override def getResultSetMetadata: TGetResultSetMetadataResp = {
175-
if (_remoteOpHandle == null) {
175+
if (_remoteOpHandle == null || !hasResultSet) {
176176
val tColumnDesc = new TColumnDesc()
177177
tColumnDesc.setColumnName("Result")
178178
val desc = new TTypeDesc

kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,14 @@ class KyuubiSessionImpl(
104104
}
105105

106106
@volatile private var _client: KyuubiSyncThriftClient = _
107-
def client: KyuubiSyncThriftClient = _client
107+
def client: KyuubiSyncThriftClient = {
108+
if (sessionConf.get(SESSION_ENGINE_LAUNCH_ASYNC)) {
109+
// When `SESSION_ENGINE_LAUNCH_ASYNC` is set to true,
110+
// launch engine when client used for the first time.
111+
waitForEngineLaunched()
112+
}
113+
_client
114+
}
108115

109116
@volatile private var _engineSessionHandle: SessionHandle = _
110117

@@ -180,8 +187,9 @@ class KyuubiSessionImpl(
180187
_engineSessionHandle =
181188
engineClient.openSession(protocol, user, passwd, openEngineSessionConf)
182189
_client = engineClient
183-
logSessionInfo(s"Connected to engine [$host:$port]/[${client.engineId.getOrElse("")}]" +
184-
s" with ${_engineSessionHandle}]")
190+
logSessionInfo(
191+
s"Connected to engine [$host:$port]/[${_client.engineId.getOrElse("")}]" +
192+
s" with ${_engineSessionHandle}]")
185193
shouldRetry = false
186194
} catch {
187195
case e: TTransportException
@@ -234,12 +242,14 @@ class KyuubiSessionImpl(
234242

235243
override protected def runOperation(operation: Operation): OperationHandle = {
236244
if (operation != launchEngineOp) {
237-
try {
238-
waitForEngineLaunched()
239-
} catch {
240-
case t: Throwable =>
241-
operation.close()
242-
throw t
245+
if (!sessionConf.get(SESSION_ENGINE_LAUNCH_ASYNC)) {
246+
try {
247+
waitForEngineLaunched()
248+
} catch {
249+
case t: Throwable =>
250+
operation.close()
251+
throw t
252+
}
243253
}
244254
sessionEvent.totalOperations += 1
245255
}

0 commit comments

Comments
 (0)