Skip to main content

Tasks

Concept

So you want to run a background task running at some specified interval? Thankfully, that's a common thing to do, whether you're working with some API or handling some background logic don't worry, the Pycord tasks extension is here to make life easier for you!

With the tasks extension, you can make background tasks without worrying about the asyncio hell and mind bogglers like "what if my internet dies" and "how do I handle reconnecting" with the added benefit of a lot of useful additions that are one line of code away!

Example Usage

import discord
from discord.ext import tasks

bot = discord.Bot()

@bot.event
async def on_ready():
very_useful_task.start()

@tasks.loop(seconds=5)
async def very_useful_task():
print('doing very useful stuff.')

bot.run("TOKEN")

If you do run this code in your terminal you should see something like this after about 15 seconds:

doing very useful stuff.
doing very useful stuff.
doing very useful stuff.

For a more useful example here's a task in a cog context ripped straight from the docs:

from discord.ext import tasks, commands

class MyCog(commands.Cog):
def __init__(self):
self.index = 0
self.printer.start()

def cog_unload(self):
self.printer.cancel()

@tasks.loop(seconds=5)
async def printer(self):
print(self.index)
self.index += 1

As you'd expect this will increment a number and print it out every 5 seconds like so:

0
# 5 seconds later
1
# 5 more seconds later
2
# ...

Tasks

Now let's get into the nitty-gritty of tasks.

As you've seen tasks can work in both outer scope and class contexts and the handle is roughly the same, you define a task using the tasks.loop decorator and use the start method to start it, the difference is you add the self argument when in a class context so since most people have all their bot's code in Cogs all the following code blocks will be in a Cog context to make it easy to copy it then apply whatever modifications you might need!

Creating a task

A look at discord.ext.tasks.loop's definition in the documentation reveals that:

  1. As you've seen before it expects the time between each execution, that however doesn't have to be in seconds as there are seconds, minutes and hours parameters to make it easier for us, they can also be floats in case you want ultra-precision.
  2. You can also pass in a datetime.time object or a sequence of them in the time parameter which will make it run at the specified time(s).
  3. You can pass in how many times a loop runs before it stops by passing in the count parameter, you can use None to make it run forever which is also the default.
  4. The loop function must be a coroutine.

These are all really useful and they aren't the only parameters so if you wanna know more about them check out the docs!

Attributes

A task has the following attributes:

  • current_loop: The current loop the task is on.
  • hours, minutes, seconds: attributes that represent the time between each execution.
  • time: A list of datetime.time objects that represent the times the task will run, returns None if no datetime objects were passed.
  • next_iteration: A datetime.datetime object that represents the next time the next iteration of the task will run, can return None if the task stopped running.

These attributes serve as a really powerful asset to get info about your loop.

Example

Now let's create a cog that handles a leaderboard we have in our bot using Tasks then explain what we did after that and also provide a refresher of how slash commands groups work in cogs.

For the sake of this example let's pretend that we have a leaderboard module that does all the leaderboard handling for us and that computing the leaderboard is very expensive computationally wise so we want to store one version of it that gets regularly updated instead of generating it every time someone calls the /leaderboard view command.

from discord.ext import tasks, commands
from discord.commands import SlashCommandGroup, CommandPermissions

from leaderboard import * # Mock leaderboard module.

class LeaderboardCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.update_leaderboard.start()
self.leaderboard_embed = generate_leaderboard_embed()

leaderboard = SlashCommandGroup(name='leaderboard', description='Leaderboard commands.')

@tasks.loop(minutes=10)
async def update_leaderboard(self):
print('Updating leaderboard...')
await update_leaderboard()
self.leaderboard_embed = generate_leaderboard_embed()

@update_leaderboard.before_loop
async def before_update_leaderboard(self):
await self.bot.wait_until_ready()
print("About to start updating leaderboard.")

@update_leaderboard.after_loop
async def after_update_leaderboard(self):
leaderboard_cleanup()
print("Stopped updating leaderboard.")

@update_leaderboard.error
async def update_leaderboard_error(self, error):
print(f"Oh no, an error occurred while updating the leaderboard. Error: {error}")

@leaderboard.command()
async def view(self, ctx):
await ctx.respond(embed=self.leaderboard_embed)

@leaderboard.command()
async def update_info(self, ctx):
await ctx.respond(f"""***Leaderboard Info***\n
Last update: {(600 - self.update_leaderboard.next_iteration.timestamp())//60} minutes ago.\n
Next update: in {self.update_leaderboard.next_iteration.timestamp() // 60} minutes.\n
Time between loops: {self.update_leaderboard.minutes} minutes.\n
Times updated this session: {self.update_leaderboard.current_loop}.
Running? {self.update_leaderboard.is_running()}
Failed? {self.update_leaderboard.failed()}""", ephemeral=True)

# Now for the owner only commands:

leaderboard_config = SlashCommandGroup(name="leaderboard_config", description="Leaderboard configuration commands", permission=[CommandPermissions("owner", 2, True)])

@leaderboard_config.command()
async def set_time_between_updates(self, ctx, minutes: int):
self.update_leaderboard.change_interval(minutes=minutes)
await ctx.respond(f"Set time between updates to {minutes} minutes.")

@leaderboard_config.command()
async def stop(self, ctx):
self.update_leaderboard.stop()
await ctx.respond("Stopped the leaderboard update.")

@leaderboard_config.command()
async def restart(self, ctx):
self.update_leaderboard.restart()
await ctx.respond("Restarted the leaderboard update.")

def setup(bot):
bot.add_cog(LeaderboardCog(bot))

Phew, that was quite a bit of code.

Now to explain what's going on:

First things first we create a cog and in its __init__ function we start the update_leaderboard task and generate the first instance of our leaderboard's embed.

After that, we define the update_leaderboard task using the loop decorator and we make it run every ten minutes by passing 10 to the minutes parameter.

Then that we define the before_update_leaderboard task using the before_loop decorator and we make it wait until the bot is ready before starting the task.

Next up we define the after_update_leaderboard task using the after_loop decorator and we make it clean up the leaderboard when the loop finally stops running.

Then we define the update_leaderboard_error task using the error decorator and we make it print any errors that may occur while we update our leaderboard to the console.

Then we get into the fun stuff, we create a slash command group using the SlashCommandGroup class and name it leaderboard, we then create the view sub-command which sends the embed generated when the leaderboard is updated and update_info which shows a lot of data about the task using some of the attributes and functions available to us.

We also create a slash command group called leaderboard_config which is only available to the owner of the bot.

Then we create the set_time_between_updates sub-command which takes in the time in minutes and changes the time between updates of the leaderboard using the change_interval method, the stop sub-command which stops the task using the stop method and restart which restarts the task using the restart method.

Finally, we add the LeaderboardCog cog to the bot and we're done!

I'd highly recommend you check the tasks extension documentation for more info on how to use them and what other functions they have.