Après lecture, on comprend que les auteurs ont a disposition une base de données de "séquences de contrôles" correspondant aux actions effectués par des utilisateurs d'un service de vidéos à la demande. Ils cherchent à détecter en temps réel quelles séquences doivent être considérées comme des anomalies, afin de corriger des problèmes pour améliorer l'expérience utilisateur.
Pour ce faire, ils commencent par appliquer l'algorithme des k-means pour diviser les séquences en $K$ groupes (en testant des valeurs de $K$ allant de 2 à 20). Pas de détails sur l'espace de représentation des données. Sans plus d'informations on peut considérer qu'il s'agissait plutôt d'un k-médoïdes, car il suffit alors de définir des distances (déjà pas évident). Le groupe contenant le moins d'individus est considéré comme le groupe des anomalies.
On construit ensuite $K$ chaînes de Markov, une par groupe, en comptant les transitions (cf. [*]). Le principe est alors de déterminer les probabilités de la (nouvelle) séquence en cours selon chaque chaîne : cette dernière est associée au cluster ayant la probabilité d'occurrence la plus haute. Si une séquence se retrouve classée dans le groupe des anomalies à au moins deux échelles de temps sur les trois, alors on la considère comme une anomalie.
[*] : par exemple si l'on observe A -> B -> C -> B -> A -> C -> A -> C, il y a 1 transition de A vers B et 2 de A vers C : on en déduit les probabilités 0 1/3 2/3 d'aller depuis A respectivement en A, B et C. Depuis B on observe seulement B -> C et B -> A (une fois chacune), donc 1/2 0 1/2. Enfin depuis C, C -> B et C -> A (une occurrence de chaque) => 1/2 1/2 0. Finalement la matrice déduite de ce petit exemple serait
$$\begin{pmatrix} 0 & 1/3 & 2/3\\ 1/2 & 0 & 1/2\\ 1/2 & 1/2 & 0\end{pmatrix}$$Dans un premier temps on ne considère que nos actions : le graphe a 3 (ou 5) sommets correspondants aux choix possibles.
import numpy as np
# Ordre des états: P, R, S
P_rock = np.matrix([[0.25, 0.5, 0.25], [0.25, 0.5, 0.25], [0.25, 0.5, 0.25]])
P_paper = np.matrix([[0.5, 0.25, 0.25], [0.5, 0.25, 0.25], [0.5, 0.25, 0.25]])
P_scissor = np.matrix([[0.25, 0.25, 0.5], [0.25, 0.25, 0.5], [0.25, 0.25, 0.5]])
P_change = np.matrix([[0.2, 0.4, 0.4], [0.4, 0.2, 0.4], [0.4, 0.4, 0.2]])
P_keep = np.matrix([[0.5, 0.25, 0.25], [0.25, 0.5, 0.25], [0.25, 0.25, 0.5]])
P_balance = np.matrix([[1/3, 1/3, 1/3], [1/3, 1/3, 1/3], [1/3, 1/3, 1/3]])
agents = { 'rock': P_rock, 'paper': P_paper, 'scissor': P_scissor,
'change': P_change, 'keep': P_keep, 'balance': P_balance }
actions = { 'P': 0, 'R': 1, 'S': 2 }
scores = np.matrix([[0.5, 1, 0], [0, 0.5, 1], [1, 0, 0.5]])
import random
def playRandomGuy(seed=None, cheat=False):
global actions, scores
actionsArray = list(actions.keys())
random.seed(seed)
agentCode = random.choices(list(agents.keys()))[0]
if cheat:
print(agentCode)
P = agents[agentCode]
curState = None
myPoints, userPoints = 0, 0
while True:
# Attente d'une action utilisateur :
choice = input('Move?').upper()
if choice not in actionsArray:
break
# Choix d'une action (programme) :
if curState is None:
curState = random.choices(actionsArray)[0]
else:
curState = random.choices(actionsArray, weights=P[actions[curState],:].tolist()[0])[0]
# Mise à jour du score
score = scores[actions[curState], actions[choice]]
myPoints += score
userPoints += 1 - score
print(f'Prog [{curState}]: {myPoints} / User [{choice}]: {userPoints}')
if not cheat:
print(agentCode)
playRandomGuy(cheat=True) #pour essayer de gagner plus vite... :)
rock Move?p Prog [R]: 0.0 / User [P]: 1.0 Move?p R Prog [P]: 0.5 / User [P]: 1.5 Move?p P Prog [R]: 0.5 / User [P]: 2.5 Move?p R Prog [R]: 0.5 / User [P]: 3.5 Move?x
En principe le plus difficile à battre (impossible d'ailleurs) est le joueur "parfaitement aléatoire". Dans chacun des autres cas on détermine facilement une stratégie gagnante sur le long terme :
La difficulté consiste bien sûr à déterminer contre qui on joue : la seconde partie de l'exercice porte là-dessus.
Ensuite, écrivons un programme qui construit petit à petit une chaîne correspondant aux transitions utilisateur.
def counterAction(choix):
if choix == 'P':
return 'S'
if choix == 'R':
return 'P'
if choix == 'S':
return 'R'
def playAdaptiveGuy(seed=None):
global actions, scores
random.seed(seed)
actionsArray = list(actions.keys())
oldChoice = -1
myPoints, userPoints = 0, 0
P = np.matrix([[0, 0, 0], [0, 0, 0], [0, 0, 0]]) #unnormalized user matrix
while True:
# Attente d'une action utilisateur :
choice = input('Move?').upper()
if choice not in actionsArray:
break
# Choix d'une action (programme) :
indMax = np.argmax(P[oldChoice,:]) if oldChoice >= 0 else 0
curState = (random.choices(actionsArray)[0] if oldChoice < 0 or P[oldChoice,indMax] == 0
else counterAction(actionsArray[indMax]))
# Mise à jour de la matrice de transitions utilisateur :
if oldChoice >= 0:
P[oldChoice,actions[choice]] += 1
oldChoice = actions[choice]
# Mise à jour du score
score = scores[actions[curState], actions[choice]]
myPoints += score
userPoints += 1 - score
print(f'Prog [{curState}]: {myPoints} / User [{choice}]: {userPoints}')
# Normalisation de P pour affichage
sum_of_rows = np.array(P.sum(axis=1))
sum_of_rows[sum_of_rows == 0] = 1
normalized_P = P / np.squeeze(np.asarray(sum_of_rows))[:, None]
print(normalized_P)
playAdaptiveGuy()
Move?p Prog [P]: 0.5 / User [P]: 0.5 Move?p Prog [S]: 1.5 / User [P]: 0.5 Move?r Prog [S]: 1.5 / User [R]: 1.5 Move?r Prog [R]: 2.0 / User [R]: 2.0 Move?r Prog [P]: 3.0 / User [R]: 2.0 Move?s Prog [P]: 3.0 / User [S]: 3.0 Move?s Prog [S]: 3.5 / User [S]: 3.5 Move?x [[0.5 0.5 0. ] [0. 0.66666667 0.33333333] [0. 0. 1. ]]
L'article décrit en prenant l'exemple de la fonction $(x, y) \mapsto \sin(xy)$ une méthode d'intégration où l'échantillonnage du domaine est guidé par une chaîne de Markov (TODO).