Skip to content

Cogment High-Level API guide (Python)

Prerequisites

This document assumes the reader is familiar with the Cogment Fundamentals.

The High-level Cogment API expects users to use protocol buffers to declare a project's data structures. The intricacies of protobufs are beyond the scope of this document. Basic knowledge of the technology and its usage is assumed.

The cogment.yaml file

An actor class is primarily defined by its observation space and action space.

The data structures describing these spaces are declared by using a protocol buffer message type. Observations and actions will simply be instances of the matching type.

For example, in the following, driver and pedestrian share a common view of the environment, hence use the same observation space, but have different actions available to them.

import:
  proto:
    - city.proto

actors:
  driver:
    observation:
      space: city.Observation

    action:
      space: city.DriverAction

  pedestrian:
    observation:
      space: city.Observation

    action:
      space: city.PedestrianAction

⚠️ This shows only the relevant part of the full cogment.yaml, you can find the full list of configurable options in the reference page.

Compiling the cogment.yaml

In order to use the cogment.yaml file within python scripts, it needs to be interpreted into a python module. This is done by the cogment cli (Command Line Interface) that can be installed following those directions.

$ cogment run --file /path/to/cogment.yaml --python_dir=./

This will create a cog_settings.py module in the current directory.

The cogment cli will also compile the imported .proto files in python modules living in the same location.

Environment

Environments are implemented by a Python function that uses a cogment.EnvironmentSession instance.

This function will be called once for each [trial][../concepts/glossary.md#trial)]. This function usually consists of three sections.

  • The environment's initialization, where its internal state can be initialized and processes started. It ends with the sending of the initial observations to the actors participating in the trial.
  • Its event loop, where the environment iterates through the events occurring during the trial and produces observations as well as receives messages. In this loop the environment can end the trial on its own or the end can be requested by a controller.
  • Its termination, where cleanup occurs.

In the common case where all actors within a trial share the same observation, a bare-minimum environment service would look like this:

async def environment(environment_session):
    # -- Initialization --

    # Retrieve the actors participating in the trial
    actors = environment_session.get_active_actors()

    # Start the trial and send a default observation to all actors
    environment_session.start([("*", Observation())])

    # -- Event loop --
    async for event in environment_session.event_loop():
        if event.actions:
            # `event.actions` is a list of the actions done by the actors (with a 1-1 matching)
            actions = event.actions
            if event.type == cogment.EventType.ACTIVE:
              # The trial is active, produce an observation in response to the actions
              environment_session.produce_observations([("*", Observation())])
              # Alternatively the environment can decide to **end** the trial with the following
              # environment_session.end([("*", Observation())])
            else:
              # The trial termination has been requested by an external controller
              # Produce a final observation
              environment_session.end([("*", Observation())])

        for message in event.messages:
            # `event.messages` is a list of all the messages received by the environment (it can be empty)

            # Handle each message here.

    # -- Termination --

This environment implementation needs to be registered and served so that the orchestrator can reach it. This can be done through a Context instance.

context = cogment.Context(cog_settings=cog_settings, user_id="my_user_id")

context.register_environment(impl=environment)

await context.serve_all_registered(port=9000)

Sending observations

The environment session has 3 different methods able to send observations: start, produce_observations and end. Each of those methods takes a list of 2-tuples destination / observation.

As demonstrated above, sending the same observation to all actors is done using "*" as the destination.

environment_session.produce_observations([("*", Observation(...))])

It is also possible to send different observations to different actors. This can be useful to send observations of the world from the point of view of the actor or to send partial observations.

environment_session.produce_observations([
  ("my_first_actor_name", Observation(...)),
  ("my_second_actor_name", Observation(...))
])

Please note that the environment should always send observations such as each actor in the trial receives one.

Actor

Actors implementations look a lot like the environment's. They take a cogment.ActorSession instance and have the same three sections: initialization, event loop and termination.

The event loops in Actors' implementations handle three basic types of events:

  • observation produced by the environment and that should lead to an action being done.
  • rewards sent by other actors or the environment, we'll talk about them in more details below.
  • messages sent by other actors or the environment, we'll talk about them in more details below.

A typical actor implementation would look like this:

async def driver_actor(actor_session):
    # -- Initialization --

    # Notify that the actor is ready for the trial to start.
    actor_session.start()

    async for event in actor_session.event_loop():
        if event.observation:
            # `event.observation` is an instance of the Observation produced by the environment
            observation = event.observation
            if event.type == cogment.EventType.ACTIVE:
              # The trial is active, it is expecting the agent to do an action
              actor_session.do_action(DriverAction(...))

        for reward in event.rewards:
            # `event.rewards` is a list of all the rewards received by the actor (it can be empty)

            # Handle each reward here.

        for message in event.messages:
            # `event.messages` is a list of all the messages received by the actor (it can be empty)

            # Handle each message here.

Service actor / Client actor

A Cogment app can use two types of actors, they are identical in terms of implementation but differ in how they interact with the app's Orchestrator.

Service actors are accessible in the same way the environment is, through a Context instance.

context = cogment.Context(cog_settings=cog_settings, user_id="rps")
context.register_actor(
    impl=actor,
    impl_name="driver_actor",
    actor_classes=["driver"])


await context.serve_all_registered(port=9000)

Please note that this is also through this registrating that the implementation is associated with one or more actor classes it implements.

Client actors, contrary to Service actors, are not served to the orchestrator. They connect as clients of the orchestrator and join a trial that has started.

context = cogment.Context(cog_settings=cog_settings, user_id="rps")
context.register_actor(
    impl=actor,
    impl_name="driver_actor",
    actor_classes=["driver"])

await context.join_trial(
  trial_id=trial_id,
  endpoint="orchestrator:9000",
  impl_name="human")

Please note, that a trial including one or more client actors will wait for all of them to join before any actor can start processing events.

Due to the different network requirements, client actors are a good fit when implementing a frontend for human actors. In addition to the python SDK demonstrated above, client actors can be implemented in javascript using the corresponding SDK.

Controller

Trials are started by clients of the Orchestrator using a Controller. Instances of a controller are built from the Context instance and connect to an Orchestrator endpoint.

controller = context.get_controller(
  endpoint=cogment.Endpoint("orchestrator:9000")
)

The controller can then be used to create trials and request their termination.

trial_id = await controller.start_trial(trial_config=TrialConfig())

# ...

await controller.terminate_trial(trial_id)

The controller can also be used to subscribe to events occuring in the trials run by the Orchestrator it connects to. For example, this can be used to wait for a trial's end:

async for trial_info in controller.watch_trials(trial_state_filters=[cogment.TrialState.ENDED]):
    # The trial having id {trial_info.trial_id} ended.

The full documentation for the controller can be found here.

Rewards

Creating

Rewards are sent to actors from another actor or the environment. The session instance passed to their implementation can be used for this purpose.

session.add_reward(
  value=-1,
  confidence=1,
  tick_id=-1,
  to=['an_actor_name'])

Rewards consist of an arbitrary numerical value describing how the reward "sender" believes the actor performed. It is weighted by a value between 0 and 1 qualifying the confidence of the "sender" in its reward, from a very low confidence just above 0 to a very high confidence approaching 1. The confidence value is used to collate all the rewards sent to an actor at the same time. Optionally, a reward can be provided with arbitrary user data.

Each reward applies to a list of recipients (either all the actors, all the actors of a given class or a specific actor) at a specific point in time, during the trial, defined as a tick.

The full documentation for session.add_reward can be found here.

Consuming

All the rewards that are sent and destined to each specific actor for a given point in time are collated together by the framework.

The actor can take into account the reward directly as the trial is running by consuming the "reward" event in their event loop.

async for event in actor_session.event_loop():
    # [...]
    for reward in event.rewards:
        # `reward.tick_id` is the id of the tick this reward concerns.
        tick_id = reward.tick_id
        # `reward.value` is the aggregated value of the reward.
        value = reward.value
        for (src_value, src_confidence, sender, user_data) in reward.all_sources():
            # Iterate over individual source rewards.

Messages

Creating

Messages can be created and sent between actors or the environment within a trial using their session instance.

session.send_message
  user_data=MyProtobufDataStructure(...), # any protobuf data structure can be used here.
  to=['pedestrian:*'], # send the message to all the actors of the "pedestrian" class
  to_environment=False)

Messages consist of an arbitrary payload, their user_data, defined as an instance of any protobuf data structure.

A message can be sent to one, many or all actors in a trial and / or to the environment.

The full documentation for session.send_message can be found here.

Consuming

All the messages that are sent and intended for each specific actor or environment will be received by the target actor or environment.

Actors or the environment can use the message directly, live, as the trial is running by consuming message event in their event loop.

async for event in actor_session.event_loop():
    # [...]
    for message in event.messages:
        # `message.sender_name` is the name of the actor who sent a message
        sender_name = message.sender_name
        # `message.payload` is the content of the message, it needs to be unpacked
        payload = message.payload

Delta Encoding

By default, observations are sent in their entirety from the environment to the actors. However, it's fairly common to only have a small portion of an observation to change from one update to the next.

Cogment allows you to specify a separate data structure to encode partial observation updates. However, if you do so, you must provide a method that can apply the deltas to previous observations.

# delta.py
def apply_delta(previous_observation, delta):
    # Return the updated observation, more often
    # than not, this should be the previous
    # observation that was modified in-place.
    previous_observation.car_position = delta.new_car_pos
    return previous_observation
# cogment.yaml
import:
  proto:
    - city.proto
  python:
    - delta

actors:
  my_class:
    observation:
      space: city.Observation
      delta: city.ObservationDelta
      delta_apply_fn:
        python: delta.apply_delta