Txosc
De Centre de Ressources Numériques - Labomedia
Un protocole de Open Sound Control (OSC) pour Twisted.
Sommaire
Ressources
- twistedmatrix.com Twisted est un moteur réseau piloté par événement écrit en Python sous la licence libre MIT license. Twisted sur Wikipedia EN.
- bitbucket.org/arjan Site du projet. Cette bibliothèque implémente OSC version 1.1 sur UDP et TCP pour Twisted.
Projects utilisant txosc
Installation
twisted est utilisé par Ubuntu, il est installé par défaut. Pour installer txosc:
sudo apt-get install python-txosc
Sinon:
sudo apt-get install python-twisted python-txosc
Bout de code en complément des exemples de txosc
Réception en UDP dans un script
#!/usr/bin/env python
from twisted.internet import reactor
from txosc import osc
from txosc import dispatch
from txosc import async
def foo_handler(message, address):
print("foo_handler")
print(" Got %s from %s" % (message, address))
# message.getValues() est une liste
print(message.getValues())
class UDPReceiverApplication(object):
"""
Example that receives UDP OSC messages.
"""
def __init__(self, port):
self.port = port
self.receiver = dispatch.Receiver()
self._server_port = reactor.listenUDP(self.port, async.DatagramServerProtocol(self.receiver))
print("Listening on osc.udp://localhost:%s" % (self.port))
self.receiver.addCallback("/foo", foo_handler) # methode definie en dehors de la classe
self.receiver.addCallback("/toto", self.toto_handler) # methode definie dans la classe
def toto_handler(self, message, address):
print("toto_handler")
print(" Got %s from %s" % (message, address))
# message.getValues() est une liste
print(message.getValues())
if __name__ == "__main__":
app = UDPReceiverApplication(1234)
reactor.run()
Envoi et réception en UDP en local dans un script
#!/usr/bin/env python
"""
This example send message and bundle on localhost with UDP.
This is only a reminder.
This example is in the public domain.
"""
# test with wisted and txosc
# you must install python-twisted and python-txosc
from twisted.internet import reactor
from txosc import osc
from txosc import dispatch
from txosc import async
import time
send_address = 'localhost', 9000
time_init = time.time()
# image is a list len(H) of list len(V)
global image
H = 4
V = 4
image = [[h for h in range(H)] for v in range(V) ]
time.sleep(2)
print image
class UDPReceiverApplication(object):
def __init__(self, port):
self.port = port
self.receiver = dispatch.Receiver()
self._server_port = reactor.listenUDP(self.port, async.DatagramServerProtocol(self.receiver))
print("Listening on osc.udp://localhost:%s" % (self.port))
self.receiver.addCallback("/image", self.image_handler)
def image_handler(self, message, address):
print("Got image_handler", message)
print(" Got %s from %s" % (message, address))
class UDPSenderApplication(object):
def __init__(self, port, host="127.0.0.1"):
self.port = port
self.host = host
self.client = async.DatagramClientProtocol()
self._client_port = reactor.listenUDP(0, self.client)
def send_message(self, message):
self.client.send(message, (self.host, self.port))
if __name__ == "__main__":
# receiver
appR = UDPReceiverApplication(9000)
time.sleep(2)
# sender
appS= UDPSenderApplication(9000)
# one message
msg = osc.Message("/image")
for v in range(V):
for h in range(H):
msg.add(image[v][h])
appS.send_message(msg)
# the same message twice in a bundle
b = osc.Bundle()
b.add(msg)
b.add(msg)
appS.send_message(b)
# twisted run
reactor.run()
Récupération des valeurs dans un message
def image_handler(self, message, address):
print("Got image_handler", message)
print(" Got %s from %s" % (message, address))
values = message.getValues()