Updated sway stuff
This commit is contained in:
142
.config/eww/scripts/mpris2.py
Executable file
142
.config/eww/scripts/mpris2.py
Executable file
@@ -0,0 +1,142 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from gi.repository import AstalMpris as Mpris, Gio
|
||||
from gi.events import GLibEventLoopPolicy
|
||||
import json
|
||||
import asyncio
|
||||
import sys
|
||||
|
||||
|
||||
PRIORITY_PLAYERS = {"Feishin"}
|
||||
|
||||
|
||||
class MprisMonitor(Gio.Application):
|
||||
"""MPRIS monitor application."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.mpris = Mpris.Mpris.new()
|
||||
self.players: dict[str, Mpris.Player] = {} # type: ignore[annotation-unchecked]
|
||||
self.display_player: Mpris.Player | None = None # type: ignore[annotation-unchecked]
|
||||
|
||||
def connect_player(self, mpris: Mpris.Mpris, player: Mpris.Player):
|
||||
"""Connect a new player."""
|
||||
self.players[player.props.bus_name] = player
|
||||
if self.display_player is None:
|
||||
self.display_player = player
|
||||
|
||||
elif (
|
||||
player.props.identity in PRIORITY_PLAYERS
|
||||
and self.display_player.props.identity not in PRIORITY_PLAYERS
|
||||
):
|
||||
self.display_player = player
|
||||
|
||||
player.connect("notify", self.on_status_update)
|
||||
|
||||
def disconnect_player(self, mpris: Mpris.Mpris, player: Mpris.Player):
|
||||
"""Disconnect a closed player."""
|
||||
print(f"player disconnected: {player.props.bus_name}", file=sys.stderr)
|
||||
existing_player = self.players.get(player.props.bus_name)
|
||||
self.players = {
|
||||
key: val for key, val in self.players.items() if val is not existing_player
|
||||
}
|
||||
if existing_player is self.display_player:
|
||||
self.display_player = None
|
||||
for player in self.players.values():
|
||||
if not self.display_player:
|
||||
self.display_player = player
|
||||
elif (
|
||||
self.display_player.props.identity not in PRIORITY_PLAYERS
|
||||
and player.props.identity in PRIORITY_PLAYERS
|
||||
):
|
||||
self.display_player = player
|
||||
|
||||
if existing_player is not None:
|
||||
existing_player.disconnect_by_func(self.on_status_update)
|
||||
del existing_player
|
||||
|
||||
self.output_status()
|
||||
|
||||
def do_activate(self):
|
||||
"""Activate the application."""
|
||||
self.mpris.connect("player-added", self.connect_player)
|
||||
self.mpris.connect("player-closed", self.disconnect_player)
|
||||
for player in self.mpris.get_players():
|
||||
self.connect_player(self.mpris, player)
|
||||
self.output_status()
|
||||
self.hold()
|
||||
|
||||
def on_status_update(self, player: Mpris.Player, *args):
|
||||
"""Perform status update tasks."""
|
||||
if (
|
||||
player.props.identity in PRIORITY_PLAYERS
|
||||
and player.props.playback_status == Mpris.PlaybackStatus.PLAYING
|
||||
):
|
||||
self.display_player = player
|
||||
elif (
|
||||
player.props.playback_status == Mpris.PlaybackStatus.PLAYING
|
||||
and self.display_player != Mpris.PlaybackStatus.PLAYING
|
||||
):
|
||||
self.display_player = player
|
||||
self.output_status()
|
||||
|
||||
def output_status(self):
|
||||
"""Print the status of the currently active player, or an offline status if no active player exists."""
|
||||
if self.display_player is not None:
|
||||
self.print_player_status(self.display_player)
|
||||
else:
|
||||
self.print_offline_status()
|
||||
|
||||
def print_player_status(self, player: Mpris.Player):
|
||||
"""Print a player's status."""
|
||||
print(
|
||||
json.dumps(
|
||||
{
|
||||
"running": player.props.available
|
||||
and player.props.playback_status != Mpris.PlaybackStatus.STOPPED,
|
||||
"playing": player.props.playback_status
|
||||
== Mpris.PlaybackStatus.PLAYING,
|
||||
"title": player.props.title,
|
||||
"artist": player.props.artist,
|
||||
"album": player.props.album,
|
||||
"album_artist": player.props.album_artist,
|
||||
"position": player.props.position,
|
||||
"position_minutes": int((player.props.position) // 60),
|
||||
"position_seconds": f"{int((player.props.position + 0) % 60):02}",
|
||||
"length": player.props.length,
|
||||
"length_minutes": int((player.props.length + 0) // 60),
|
||||
"length_seconds": f"{int((player.props.length + 0) % 60):02}",
|
||||
"active_player": player.props.identity,
|
||||
}
|
||||
),
|
||||
flush=True,
|
||||
)
|
||||
|
||||
def print_offline_status(self):
|
||||
"""Print an offline status for when we have no players."""
|
||||
print(
|
||||
json.dumps(
|
||||
{
|
||||
"running": False,
|
||||
"playing": False,
|
||||
"title": None,
|
||||
"artist": None,
|
||||
"album": None,
|
||||
"album_artist": None,
|
||||
"position": None,
|
||||
"position_minutes": None,
|
||||
"position_seconds": None,
|
||||
"length": None,
|
||||
"length_minutes": None,
|
||||
"length_seconds": None,
|
||||
"active_player": "",
|
||||
}
|
||||
),
|
||||
flush=True,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.set_event_loop_policy(GLibEventLoopPolicy())
|
||||
app = MprisMonitor()
|
||||
app.run()
|
||||
Reference in New Issue
Block a user