Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recording already finished, but want to test beamlime. #240

Open
Tracked by #239
YooSunYoung opened this issue Oct 3, 2024 · 0 comments
Open
Tracked by #239

Recording already finished, but want to test beamlime. #240

YooSunYoung opened this issue Oct 3, 2024 · 0 comments
Assignees
Labels
enhancement New feature or request

Comments

@YooSunYoung
Copy link
Member

YooSunYoung commented Oct 3, 2024

As explained in the #239, there are different scenarios of run-start/run-stop status.

2, 3 will be simple to implement and it'll support most cases for users
but 1 should be implemented first since it'll enable the fast/quick tests with the existing records.

  1. Listener should find the latest run-start message from the topic and retrieve the timestamp and the job-id.
  2. Listener should find set the time-offset for finding the run-stop message and match it with the job-id.
  3. Listener should start consuming the event/log data between those two time-stamps

Here is a script that quickly check last 10 messages in coda.

import os

def test():
    from rich import print
    from confluent_kafka import Consumer
    from streaming_data_types.run_stop_6s4t import deserialise_6s4t
    from streaming_data_types.run_start_pl72 import deserialise_pl72

    consumer = Consumer(
        {
            "bootstrap.servers": f"{os.environ['KAFKA_SERVER']}:{os.environ['KAFKA_PORT']}",
            "group.id": "beamlime",
            "auto.offset.reset": "earliest",
            "enable.auto.commit": False,  # Not to leave commit of this testing-group.
        }
    )
    try:
        consumer.subscribe(["ess_filewriter_pool"])
        for _ in range(10):
            msg = consumer.poll(timeout=2.0)
            print(msg)
            print(msg.value()[4:8].decode())
            pl72 = deserialise_pl72(msg.value())
            print(pl72.instrument_name)
    except Exception as e:
        consumer.close()
        raise e
    else:
        consumer.close()

if __name__ == "__main__":
    test()
@YooSunYoung YooSunYoung added the enhancement New feature or request label Oct 3, 2024
@YooSunYoung YooSunYoung self-assigned this Oct 3, 2024
@YooSunYoung YooSunYoung moved this from Triage to In progress in Development Board Oct 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Status: In progress
Development

No branches or pull requests

1 participant