Skip to content

Consumer: assign / subscribe on run() #133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

felixschlegel
Copy link
Contributor

Motivation:

We should start assignments / subscriptions when KafkaConsumer.run()
is invoked and not on initialization.

Modifications:

  • KafkaConsumer: move assign / subscribe from init to func run()

Motivation:

We should start assignments / subscriptions when `KafkaConsumer.run()`
is invoked and not on initialization.

Modifications:

* `KafkaConsumer`: move `assign` / `subscribe` from `init` to `func run()`
Comment on lines +327 to +332
switch self.configuration.consumptionStrategy._internal {
case .partition(topic: let topic, partition: let partition, offset: let offset):
try self.assign(topic: topic, partition: partition, offset: offset)
case .group(groupID: _, topics: let topics):
try self.subscribe(topics: topics)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably go through the state machine here

Copy link
Contributor Author

@felixschlegel felixschlegel Sep 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func assign() and func subscribe() already go through the state machine by invoking stateMachine.setupConnection

I could however hoist the state check to func run() (so state check -> check consumption strategy -> assign / subscribe instead of check consumption strategy -> state check -> assign / subscribe)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah got you. Just to double check if somebody called run twice we wouldn't subscribe twice right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it only works when in state .initializing 👍

@FranzBusch FranzBusch merged commit ee7b7c6 into swift-server:main Sep 21, 2023
@FranzBusch FranzBusch deleted the fs-kafka-subscribe-on-run branch September 21, 2023 09:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants