root/testdeejayd/coreinterface.py

Revision 1511, 32.6 kB (checked in by Mickael Royer <mickael.royer@…>, 8 days ago)

[deejayd] In JSON answer, use Boolean type instead of 0/1 when needed

Line 
1# Deejayd, a media player daemon
2# Copyright (C) 2007-2009 Mickael Royer <mickael.royer@gmail.com>
3#                         Alexandre Rossi <alexandre.rossi@gmail.com>
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License along
16# with this program; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
20import time, random
21import threading
22
23from deejayd.interfaces import DeejaydError
24from deejayd.mediafilters import *
25
26
27class InterfaceTests:
28    """Test the deejayd daemon core interface, this test suite is to be used for testing the core facade and the client library."""
29
30    def testSetMode(self):
31        """Test setMode command"""
32
33        # ask an unknown mode
34        mode_name = self.testdata.getRandomString()
35        ans = self.deejayd.set_mode(mode_name)
36        self.assertRaises(DeejaydError, ans.get_contents)
37
38        # ask a known mode
39        known_mode = 'playlist'
40        ans = self.deejayd.set_mode(known_mode)
41        self.failUnless(ans.get_contents())
42
43        # Test if the mode has been set
44        status = self.deejayd.get_status().get_contents()
45        self.assertEqual(status['mode'], known_mode)
46
47    def testGetMode(self):
48        """Test getMode command"""
49        known_keys = ("playlist", "panel", "dvd", "webradio", "video")
50        ans = self.deejayd.get_mode()
51        for k in known_keys:
52            self.failUnless(k in ans.get_contents().keys())
53            self.failUnless(ans[k] in (True, False))
54
55    def testGetStats(self):
56        """Test getStats command"""
57        ans = self.deejayd.get_stats()
58        for k in ("audio_library_update","songs","artists","albums"):
59            self.failUnless(k in ans.keys())
60
61    def testPlaylistSaveRetrieve(self):
62        """Save a playlist and try to retrieve it."""
63
64        pl = []
65        djplname = self.testdata.getRandomString()
66
67        # Get current playlist
68        djpl = self.deejayd.get_playlist()
69        self.assertEqual(djpl.get().get_medias(), pl)
70
71        ans = djpl.add_path(self.testdata.getRandomString())
72        self.assertRaises(DeejaydError, ans.get_contents)
73
74        # Add songs to playlist
75        howManySongs = 3
76        for songPath in self.test_audiodata.getRandomSongPaths(howManySongs):
77            pl.append(self.test_audiodata.medias[songPath].tags["uri"])
78            ans = djpl.add_path(songPath)
79            self.failUnless(ans.get_contents())
80
81        # Check for the playlist to be of appropriate length
82        self.assertEqual(self.deejayd.get_status()['playlistlength'],
83                         howManySongs)
84
85        ans = djpl.save(djplname)
86
87        self.failUnless(ans.get_contents())
88        djpl_id = ans["playlist_id"]
89
90        # Check for the saved playslit to be available
91        retrievedPls = self.deejayd.get_playlist_list().get_medias()
92        self.failUnless(djplname in [p["name"] for p in retrievedPls])
93
94        # Retrieve the saved playlist
95        djpl = self.deejayd.get_recorded_playlist(djpl_id, djplname, 'static')
96        retrievedPl = djpl.get().get_medias()
97        for song_nb in range(len(pl)):
98            self.assertEqual(pl[song_nb], retrievedPl[song_nb]['uri'])
99
100    def testPlaylistActions(self):
101        """Test actions on current playlist."""
102        djpl = self.deejayd.get_playlist()
103        howManySongs = 4
104        for songPath in self.test_audiodata.getRandomSongPaths(howManySongs):
105            ans = djpl.add_path(songPath)
106            self.failUnless(ans.get_contents())
107
108        content = djpl.get().get_medias()
109        song = self.testdata.getRandomElement(content)
110        # shuffle the playlist
111        ans = djpl.shuffle()
112        self.failUnless(ans.get_contents())
113        # move this song
114        ans = djpl.move([song["id"]], 1)
115        self.failUnless(ans.get_contents())
116        # delete a song
117        djpl.del_song(song["id"]).get_contents()
118        content = djpl.get().get_medias()
119        self.assertEqual(len(content), howManySongs-1)
120
121    def testSavedStaticPlaylistActions(self):
122        """Test action on saved static playlist"""
123        djplname = self.testdata.getRandomString()
124        djpl = self.deejayd.get_playlist()
125        howManySongs = 3
126        # Add songs to playlist
127        for songPath in self.test_audiodata.getRandomSongPaths(howManySongs):
128            ans = djpl.add_path(songPath)
129            self.failUnless(ans.get_contents())
130        # save playlist
131        ans = djpl.save(djplname)
132        self.failUnless(ans.get_contents())
133        djpl_id = ans["playlist_id"]
134
135        # add songs in the saved playlist
136        savedpl = self.deejayd.get_recorded_playlist(djpl_id, djplname,\
137                "static")
138        for songPath in self.test_audiodata.getRandomSongPaths(howManySongs):
139            # add twice the same song
140            ans = savedpl.add_path(songPath)
141            self.failUnless(ans.get_contents())
142            ans = savedpl.add_path(songPath)
143            self.failUnless(ans.get_contents())
144        content = savedpl.get().get_medias()
145        self.assertEqual(len(content), howManySongs*3)
146
147    def testSavedMagicPlaylistActions(self):
148        """Test action on saved magic playlist"""
149        djplname = self.testdata.getRandomString()
150        djpl_infos = self.deejayd.create_recorded_playlist(djplname,\
151                "magic").get_contents()
152        djpl = self.deejayd.get_recorded_playlist(djpl_infos["pl_id"],\
153                djplname, "magic")
154        filter = Equals('genre', self.test_audiodata.getRandomGenre())
155        rnd_filter = Equals('genre', self.testdata.getRandomString())
156
157        # add filter
158        djpl.add_filter(filter).get_contents()
159        # verify playlist
160        ans = djpl.get()
161        self.assertEqual(len(ans.get_filter()), 1)
162        self.failUnless(len(ans.get_medias()) > 0)
163        self.failUnless(filter.equals(ans.get_filter()[0]))
164
165        # add random filter
166        djpl.remove_filter(filter).get_contents()
167        djpl.add_filter(rnd_filter).get_contents()
168        # verify playlist
169        ans = djpl.get()
170        self.assertEqual(len(ans.get_filter()), 1)
171        self.assertEqual(len(ans.get_medias()), 0)
172        self.failUnless(rnd_filter.equals(ans.get_filter()[0]))
173
174        # add filter and set property
175        djpl.clear_filters().get_contents()
176        djpl.add_filter(filter).get_contents()
177        djpl.set_property("use-limit", "1").get_contents()
178        djpl.set_property("limit-value", "1").get_contents()
179        # verify playlist
180        ans = djpl.get()
181        self.assertEqual(len(ans.get_filter()), 1)
182        self.assertEqual(len(ans.get_medias()), 1)
183        self.failUnless(filter.equals(ans.get_filter()[0]))
184
185    def testWebradioAddRetrieve(self):
186        """Save a webradio and check it is in the list, then delete it."""
187
188        wr_list = self.deejayd.get_webradios()
189
190        # try to set wrong source
191        ans = wr_list.set_source(self.testdata.getRandomString())
192        self.assertRaises(DeejaydError, ans.get_contents)
193        # set local source
194        ans = wr_list.set_source('local')
195        self.failUnless(ans.get_contents())
196        # local does not have categorie, raise DeejaydError if we try to get it
197        ans = wr_list.get_source_categories('local')
198        self.assertRaises(DeejaydError, ans.get_contents)
199
200
201        # Test for bad URI and inexistant playlist
202        for badURI in [[self.testdata.getRandomString(50)],\
203                   ['http://' + self.testdata.getRandomString(50) + '.pls']]:
204            ans = wr_list.add_webradio(self.testdata.getRandomString(), badURI)
205            self.assertRaises(DeejaydError, ans.get_contents)
206
207        testWrName = self.testdata.getRandomString()
208
209        testWrUrls = []
210        for urlCount in range(self.testdata.getRandomInt(10)):
211            testWrUrls.append('http://' + self.testdata.getRandomString(50))
212
213        ans = wr_list.add_webradio(testWrName, testWrUrls)
214        self.failUnless(ans.get_contents())
215
216        wr_list = self.deejayd.get_webradios()
217        wr_names = [wr['title'] for wr in wr_list.get().get_medias()]
218        self.failUnless(testWrName in wr_names)
219
220        retrievedWr = [wr for wr in wr_list.get().get_medias()\
221                          if wr['title'] == testWrName][0]
222
223        for url in testWrUrls:
224            self.failUnless(url in retrievedWr['urls'])
225
226        ans = wr_list.delete_webradio(51)
227        self.assertRaises(DeejaydError, ans.get_contents)
228
229        ans = wr_list.delete_webradio(retrievedWr['id'])
230        self.failUnless(ans.get_contents())
231
232        wr_list = self.deejayd.get_webradios().get().get_medias()
233        self.failIf(testWrName in [wr['title'] for wr in wr_list])
234
235    def testQueue(self):
236        """Add songs to the queue, try to retrieve it, delete some songs in it, then clear it."""
237        q = self.deejayd.get_queue()
238
239        myq = []
240        how_many_songs = 10
241        for song_path in self.test_audiodata.getRandomSongPaths(how_many_songs):
242            myq.append(self.test_audiodata.medias[song_path].tags["uri"])
243            ans = q.add_path(song_path)
244            self.failUnless(ans.get_contents())
245
246        ddq = q.get()
247
248        ddq_uris = [song['uri'] for song in ddq.get_medias()]
249        for song_uri in myq:
250            self.failUnless(song_uri in ddq_uris)
251
252        random.seed(time.time())
253        songs_to_delete = random.sample(myq, how_many_songs / 3)
254        ans = q.del_songs([song['id'] for song in ddq.get_medias()\
255                                      if song['uri'] in songs_to_delete])
256        self.failUnless(ans.get_contents())
257
258        ddq = q.get()
259        ddq_uris = [song['uri'] for song in ddq.get_medias()]
260        for song_uri in myq:
261            if song_uri in songs_to_delete:
262                self.failIf(song_uri in ddq_uris)
263            else:
264                self.failUnless(song_uri in ddq_uris)
265
266        ans = q.clear()
267        self.failUnless(ans.get_contents())
268        ddq = q.get()
269        self.assertEqual(ddq.get_medias(), [])
270
271    def testPanel(self):
272        """ Test panel source actions """
273        panel = self.deejayd.get_panel()
274
275        # get panel tags
276        tags = panel.get_panel_tags().get_contents()
277        for tag in tags:
278            self.failUnless(tag in ['genre', 'artist',\
279                    'various_artist', 'album'])
280
281        # set filter
282        bad_tag = self.testdata.getRandomString()
283        random_str = self.testdata.getRandomString()
284        ans = panel.set_panel_filters(bad_tag, [random_str]) # random tags
285        self.assertRaises(DeejaydError, ans.get_contents)
286
287        tag = self.testdata.getRandomElement(tags)
288        panel.set_panel_filters(tag, [random_str]).get_contents()
289        ans = panel.get()
290        self.assertEqual([], ans.get_medias())
291
292        # remove bad filter
293        panel.remove_panel_filters(tag).get_contents()
294        ans = panel.get()
295        self.failUnless(len(ans.get_medias()) > 0)
296
297        # get correct value for a tag
298        result = self.deejayd.mediadb_list(tag, None).get_contents()
299        value = self.testdata.getRandomElement(result)
300        panel.set_panel_filters(tag, [value]).get_contents()
301        ans = panel.get()
302        self.failUnless(len(ans.get_medias()) > 0)
303
304        # clear tag
305        panel.clear_panel_filters().get_contents()
306        ans = panel.get()
307        self.failUnless(len(ans.get_medias()) > 0)
308
309        # set search
310        ans = panel.set_search_filter(bad_tag, random_str) # random tags
311        self.assertRaises(DeejaydError, ans.get_contents)
312
313        search_tag = self.testdata.getRandomElement(['title', 'album',\
314                'genre', 'artist'])
315        panel.set_search_filter(search_tag, random_str).get_contents()
316        ans = panel.get()
317        self.assertEqual([], ans.get_medias())
318
319        # clear search
320        panel.clear_search_filter().get_contents()
321        ans = panel.get()
322        self.failUnless(len(ans.get_medias()) > 0)
323
324        # test sort
325        result = panel.set_sorts([(bad_tag, 'ascending')])
326        self.assertRaises(DeejaydError, result.get_contents)
327
328        tag = self.testdata.getRandomElement(['title', 'rating', 'genre'])
329        media_list = [m[tag] for m in ans.get_medias()]
330        media_list.sort()
331        panel.set_sorts([(tag, 'ascending')]).get_contents()
332        ans = panel.get()
333        for idx, m in enumerate(ans.get_medias()):
334            self.assertEqual(m[tag], media_list[idx])
335
336        # save a playlist and update active list
337        djpl = self.deejayd.get_playlist()
338        ans = self.deejayd.get_audio_dir()
339        dir = self.testdata.getRandomElement(ans.get_directories())
340        djpl.add_paths([dir]).get_contents()
341        test_pl_name = self.testdata.getRandomString()
342        djpl.save(test_pl_name).get_contents()
343        pl_list = self.deejayd.get_playlist_list().get_medias()
344        if len(pl_list) != 1:
345            raise DeejaydError("playlist not saved")
346
347        # save a playlist and update active list
348        bad_plid = self.testdata.getRandomInt(2000, 1000)
349        ans = panel.set_active_list("playlist", bad_plid)
350        self.assertRaises(DeejaydError, ans.get_contents)
351
352        panel.set_active_list("playlist", pl_list[0]["id"]).get_contents()
353        panel_list = panel.get_active_list().get_contents()
354        self.assertEqual(panel_list["type"], "playlist")
355        self.assertEqual(panel_list["value"], pl_list[0]["id"])
356
357    def testVideo(self):
358        """ Test video source actions """
359        if self.video_support:
360            video_obj = self.deejayd.get_video()
361            # choose a wrong directory
362            rand_dir = self.testdata.getRandomString()
363            ans = video_obj.set(rand_dir, "directory")
364            self.assertRaises(DeejaydError, ans.get_contents)
365
366            # get contents of root dir and try to set video directory
367            ans = self.deejayd.get_video_dir()
368            dir = self.testdata.getRandomElement(ans.get_directories())
369            video_obj.set(dir, "directory").get_contents()
370
371            # test videolist content
372            video_list = video_obj.get().get_medias()
373            ans = self.deejayd.get_video_dir(dir)
374            self.assertEqual(len(video_list), len(ans.get_files()))
375            # sort videolist content
376            sort = [["rating", "ascending"]]
377            video_obj.set_sorts(sort).get_contents()
378            video_list = video_obj.get()
379            self.assertEqual(video_list.get_sort(), sort)
380            # set bad sort
381            rnd_sort = [(self.testdata.getRandomString(), "ascending")]
382            ans = video_obj.set_sorts(rnd_sort)
383            self.assertRaises(DeejaydError, ans.get_contents)
384
385            # search a wrong title
386            rand = self.testdata.getRandomString()
387            video_obj.set(rand, "search").get_contents()
388            video_list = video_obj.get().get_medias()
389            self.assertEqual(len(video_list), 0)
390        else:
391            try: video_obj = self.deejayd.get_video()
392            except DeejaydError: # we test core
393                pass
394            else:
395                ans = video_obj.get()
396                self.assertRaises(DeejaydError, ans.get_medias)
397
398    def testMediaRating(self):
399        """Test media rating method"""
400        # wrong media id
401        random_id = self.testdata.getRandomInt(2000, 1000)
402        ans = self.deejayd.set_media_rating([random_id], "2", "audio")
403        self.assertRaises(DeejaydError, ans.get_contents)
404
405        ans = self.deejayd.get_audio_dir()
406        files = ans.get_files()
407        file_ids = [f["media_id"] for f in files]
408        # wrong rating
409        ans = self.deejayd.set_media_rating(file_ids, "9", "audio")
410        self.assertRaises(DeejaydError, ans.get_contents)
411        # wrong library
412        rand_lib = self.testdata.getRandomString()
413        ans = self.deejayd.set_media_rating(file_ids, "2", rand_lib)
414        self.assertRaises(DeejaydError, ans.get_contents)
415
416        ans =  self.deejayd.set_media_rating(file_ids, "4", "audio")
417        self.failUnless(ans.get_contents())
418        ans = self.deejayd.get_audio_dir()
419        files = ans.get_files()
420        for f in files:
421            self.assertEqual(4, int(f["rating"]))
422
423    def testAudioLibrary(self):
424        """ Test request on audio library (get_audio_dir, search)"""
425        # try to get contents of an unknown directory
426        rand_dir = self.testdata.getRandomString()
427        ans = self.deejayd.get_audio_dir(rand_dir)
428        self.assertRaises(DeejaydError, ans.get_contents)
429
430        # get contents of root dir and try to get content of a directory
431        ans = self.deejayd.get_audio_dir()
432        dir = self.testdata.getRandomElement(ans.get_directories())
433        ans = self.deejayd.get_audio_dir(dir)
434        song_files = ans.get_files()
435        self.failUnless(len(song_files) > 0)
436
437        # search an unknown terms
438        text = self.testdata.getRandomString()
439        ans = self.deejayd.audio_search(text)
440        self.assertEqual(ans.get_medias(), [])
441
442        # search a known terms
443        file = self.testdata.getRandomElement(song_files)
444        ans = self.deejayd.audio_search(file["title"], "title")
445        self.failUnless(len(ans.get_medias()) > 0)
446
447    def testVideoLibrary(self):
448        """ Test request on video library """
449        if self.video_support:
450            # try to get contents of an unknown directory
451            rand_dir = self.testdata.getRandomString()
452            ans = self.deejayd.get_video_dir(rand_dir)
453            self.assertRaises(DeejaydError, ans.get_contents)
454
455            # get contents of root dir and try to get content of a directory
456            ans = self.deejayd.get_video_dir()
457            dir = self.testdata.getRandomElement(ans.get_directories())
458            ans = self.deejayd.get_video_dir(dir)
459            files = ans.get_files()
460            self.failUnless(len(files) > 0)
461        else:
462            ans = self.deejayd.get_video_dir()
463            self.assertRaises(DeejaydError, ans.get_contents)
464
465    def testSetOption(self):
466        """ Test set_option commands"""
467        # unknown option
468        opt = self.testdata.getRandomString()
469        ans = self.deejayd.set_option("playlist", opt, 1)
470        self.assertRaises(DeejaydError, ans.get_contents)
471        ans = self.deejayd.set_option(opt, "repeat", True)
472        self.assertRaises(DeejaydError, ans.get_contents)
473
474        ans = self.deejayd.set_option("webradio", "playorder", "inorder")
475        self.assertRaises(DeejaydError, ans.get_contents)
476
477        # set playlist option
478        ans = self.deejayd.set_option("playlist", "playorder", "random")
479        ans.get_contents()
480        status = self.deejayd.get_status().get_contents()
481        self.assertEqual(status["playlistplayorder"], "random")
482        # set video option
483        if self.video_support:
484            ans = self.deejayd.set_option("video", "repeat", True)
485            ans.get_contents()
486            status = self.deejayd.get_status().get_contents()
487            self.assertEqual(status["videorepeat"], True)
488
489    def testAudioPlayer(self):
490        """ Test player commands (play, pause,...) for audio """
491        # try to set volume
492        vol = 30
493        ans = self.deejayd.set_volume(vol)
494        self.failUnless(ans.get_contents())
495        status = self.deejayd.get_status().get_contents()
496        self.assertEqual(status["volume"], vol)
497
498        # load songs in main playlist
499        djpl = self.deejayd.get_playlist()
500        ans = self.deejayd.get_audio_dir()
501        dir = self.testdata.getRandomElement(ans.get_directories())
502        djpl.add_paths([dir]).get_contents()
503
504        # play song
505        self.deejayd.set_mode("playlist").get_contents()
506        self.deejayd.play_toggle().get_contents()
507        # verify status
508        status = self.deejayd.get_status().get_contents()
509        self.assertEqual(status["state"], "play")
510        # pause
511        self.deejayd.play_toggle().get_contents()
512        # verify status
513        status = self.deejayd.get_status().get_contents()
514        self.assertEqual(status["state"], "pause")
515        self.deejayd.play_toggle().get_contents()
516        # next and previous
517        self.deejayd.next().get_contents()
518        self.deejayd.previous().get_contents()
519        status = self.deejayd.get_status().get_contents()
520        self.assertEqual(status["state"], "play")
521        # seek command
522        self.deejayd.seek(1).get_contents()
523
524        # test get_current command
525        cur = self.deejayd.get_current().get_medias()
526        self.assertEqual(len(cur), 1)
527
528        self.deejayd.stop().get_contents()
529
530    def testVideoPlayer(self):
531        """ Test player commands (play, pause,...) for video """
532        if self.video_support:
533            video_obj = self.deejayd.get_video()
534            # set video mode
535            self.deejayd.set_mode("video").get_contents()
536
537            # choose directory
538            ans = self.deejayd.get_video_dir()
539            dir = self.testdata.getRandomElement(ans.get_directories())
540            video_obj.set(dir, "directory").get_contents()
541
542            # play video file
543            self.deejayd.play_toggle().get_contents()
544            # verify status
545            status = self.deejayd.get_status().get_contents()
546            self.assertEqual(status["state"], "play")
547
548            # test set_player_option cmd
549            self.deejayd.set_player_option("av_offset", 100).get_contents()
550            self.deejayd.set_player_option("aspect_ratio","16:9").get_contents()
551            # bad option name
552            ans = self.deejayd.set_player_option(\
553                    self.testdata.getRandomString(),0)
554            self.assertRaises(DeejaydError, ans.get_contents)
555            # bad option value
556            ans = self.deejayd.set_player_option("av_offset",\
557                    self.testdata.getRandomString())
558            self.assertRaises(DeejaydError, ans.get_contents)
559            ans = self.deejayd.set_player_option("aspect_ratio",\
560                    self.testdata.getRandomString())
561            self.assertRaises(DeejaydError, ans.get_contents)
562
563            self.deejayd.stop().get_contents()
564
565    def testDvd(self):
566        """ Test dvd commands"""
567        if self.video_support:
568            status = self.deejayd.get_status().get_contents()
569            dvd_id = status["dvd"]
570
571            self.deejayd.dvd_reload().get_contents()
572            status = self.deejayd.get_status().get_contents()
573            self.assertEqual(status["dvd"], dvd_id + 1)
574        else:
575            ans = self.deejayd.dvd_reload()
576            self.assertRaises(DeejaydError, ans.get_contents)
577
578    def test_mediadb_list(self):
579        """Test db queries for tags listing."""
580
581        tag = 'artist'
582        filter = Or(Equals('genre', self.test_audiodata.getRandomGenre()),
583                    Equals('genre', self.test_audiodata.getRandomGenre()))
584
585        expected_tag_list = []
586
587        for song_info in self.test_audiodata.medias.values():
588            matches = False
589            for f in filter.filterlist:
590                if song_info.tags['genre'] == f.pattern:
591                    matches = True
592            if matches:
593                if song_info.tags[tag] not in expected_tag_list:
594                    expected_tag_list.append(song_info.tags[tag])
595
596        result = self.deejayd.mediadb_list(tag, filter)
597
598        for tagvalue in expected_tag_list:
599            self.failUnless(tagvalue in result)
600
601
602class InterfaceSubscribeTests:
603    """Test the subscription interface. This is for the core and the async client only."""
604
605    def test_subscription(self):
606        """Checks that signals subscriptions get in and out."""
607        server_notification = threading.Event()
608
609        sub_id = self.deejayd.subscribe('player.status',
610                                        lambda x: server_notification.set())
611        self.failUnless((sub_id, 'player.status')\
612                        in self.deejayd.get_subscriptions().items())
613
614        self.deejayd.unsubscribe(sub_id)
615        self.failUnless((sub_id, 'player.status')\
616                        not in self.deejayd.get_subscriptions().items())
617
618    def generic_sub_bcast_test(self, signal_name, trigger, trigger_args=()):
619        """Checks that signal_name signal is broadcast when one of the trigger is involved."""
620        server_notification = threading.Event()
621
622        sub_id = self.deejayd.subscribe(signal_name,
623                                        lambda x: server_notification.set())
624
625        ans = trigger(*trigger_args)
626        if ans:
627            ans.get_contents()
628        server_notification.wait(4)
629        self.failUnless(server_notification.isSet(),
630                        '%s signal was not broadcasted by %s.'\
631                        % (signal_name, trigger.__name__))
632        server_notification.clear()
633
634        self.deejayd.unsubscribe(sub_id)
635
636    def test_sub_broadcast_player_status(self):
637        """Checks that player.status signals are broadcasted."""
638
639        trigger_list = ((self.deejayd.play_toggle, ()),
640                        (self.deejayd.set_option, ("playlist", 'repeat', 1)),
641                        (self.deejayd.set_option, ('panel', "playorder",\
642                                                   "random")),
643                        (self.deejayd.set_volume, (51, )),
644                        (self.deejayd.seek, (5, )),
645                       )
646
647        for trig in trigger_list:
648            self.generic_sub_bcast_test('player.status', trig[0], trig[1])
649
650    def test_sub_broadcast_player_current(self):
651        """Checks that player.current signals are broadcasted."""
652
653        trigger_list = ((self.deejayd.next, ()),
654                        (self.deejayd.previous, ())
655                       )
656
657        for trig in trigger_list:
658            self.generic_sub_bcast_test('player.current', trig[0], trig[1])
659
660    def test_sub_broadcast_player_plupdate(self):
661        """Checks that player.plupdate signals are broadcasted."""
662
663        djpl = self.deejayd.get_playlist()
664        ans = self.deejayd.get_audio_dir()
665        dir = self.testdata.getRandomElement(ans.get_directories())
666
667        trigger_list = ((djpl.add_paths, ([dir], )),
668                        (djpl.shuffle, ()),
669                        (djpl.clear, ()),
670                       )
671
672        for trig in trigger_list:
673            self.generic_sub_bcast_test('player.plupdate', trig[0], trig[1])
674
675    def test_sub_broadcast_playlist_listupdate(self):
676        """Checks that playlist.listupdate signals are broadcasted."""
677
678        djpl = self.deejayd.get_playlist()
679        ans = self.deejayd.get_audio_dir()
680        dir = self.testdata.getRandomElement(ans.get_directories())
681        djpl.add_paths([dir]).get_contents()
682
683        test_pl_name = self.testdata.getRandomString()
684        test_pl_name2 = self.testdata.getRandomString()
685
686        self.generic_sub_bcast_test('playlist.listupdate',
687                                    djpl.save, (test_pl_name, ))
688
689        retrievedPls = self.deejayd.get_playlist_list().get_medias()
690        for pls in retrievedPls:
691            if pls["name"] == test_pl_name:
692                djpl_id = pls["id"]
693                break
694        trigger_list = (
695                        (self.deejayd.erase_playlist, ([djpl_id], )),
696                        (djpl.save, (test_pl_name2,)),
697                       )
698
699        for trig in trigger_list:
700            self.generic_sub_bcast_test('playlist.listupdate', trig[0], trig[1])
701
702    def test_sub_broadcast_playlist_update(self):
703        """Checks that playlist.update signals are broadcasted."""
704        ans = self.deejayd.get_audio_dir()
705        dir = self.testdata.getRandomElement(ans.get_directories())
706        filter = Equals('genre', self.test_audiodata.getRandomGenre())
707
708        st_pl_name = self.testdata.getRandomString()
709        mg_pl_name = self.testdata.getRandomString()
710
711        st_pl_infos = self.deejayd.create_recorded_playlist(st_pl_name,\
712                'static').get_contents()
713        mg_pl_infos = self.deejayd.create_recorded_playlist(mg_pl_name,\
714                'magic').get_contents()
715
716        st_djpl = self.deejayd.get_recorded_playlist(st_pl_infos["pl_id"],\
717                st_pl_name, 'static')
718        mg_djpl = self.deejayd.get_recorded_playlist(mg_pl_infos["pl_id"],\
719                mg_pl_name, 'magic')
720
721        trigger_list = (
722                        (st_djpl.add_path, (dir,)),
723                        (mg_djpl.add_filter, (filter,)),
724                        (mg_djpl.remove_filter, (filter,)),
725                        (mg_djpl.set_property, ('use-limit', '1')),
726                       )
727        for trig in trigger_list:
728            self.generic_sub_bcast_test('playlist.update', trig[0], trig[1])
729
730    def test_sub_broadcast_panel_update(self):
731        """Checks that panel.update signals are broadcasted."""
732
733        # first save a playlist
734        djpl = self.deejayd.get_playlist()
735        ans = self.deejayd.get_audio_dir()
736        dir = self.testdata.getRandomElement(ans.get_directories())
737        djpl.add_paths([dir]).get_contents()
738        test_pl_name = self.testdata.getRandomString()
739        djpl.save(test_pl_name).get_contents()
740        pl_list = self.deejayd.get_playlist_list().get_medias()
741        if len(pl_list) != 1:
742            raise DeejaydError("playlist not saved")
743
744        djpn = self.deejayd.get_panel()
745
746        trigger_list = (
747                        (djpn.set_active_list, ("playlist", pl_list[0]["id"])),
748                        (djpn.set_active_list, ("panel", "0")),
749                        (djpn.set_panel_filters, ("genre", "zboub")),
750                        (djpn.clear_panel_filters, []),
751                        (djpn.set_sorts, ([("genre", "ascending")],)),
752                       )
753
754        for trig in trigger_list:
755            self.generic_sub_bcast_test('panel.update', trig[0], trig[1])
756
757    def test_sub_broadcast_video_update(self):
758        """Checks that video.update signals are broadcasted."""
759
760        if not self.video_support:
761            return True
762
763        djvideo = self.deejayd.get_video()
764        ans = self.deejayd.get_video_dir()
765        dir = self.testdata.getRandomElement(ans.get_directories())
766
767        trigger_list = (
768                        (djvideo.set, (dir, "directory")),
769                        (djvideo.set_sorts, ([("title", "ascending")],)),
770                       )
771
772        for trig in trigger_list:
773            self.generic_sub_bcast_test('video.update', trig[0], trig[1])
774
775    def test_sub_broadcast_webradio_listupdate(self):
776        """Checks that webradio.listupdate signals are broadcasted."""
777
778        wr_list = self.deejayd.get_webradios()
779        test_wr_name = self.testdata.getRandomString()
780        test_wr_urls = 'http://' + self.testdata.getRandomString(50)
781
782        self.generic_sub_bcast_test('webradio.listupdate',
783                                    wr_list.add_webradio,
784                                    (test_wr_name, test_wr_urls))
785
786        retrieved_wr = [wr for wr in wr_list.get().get_medias()\
787                           if wr['title'] == test_wr_name + '-1'][0]
788        self.generic_sub_bcast_test('webradio.listupdate',
789                                    wr_list.delete_webradio,
790                                    (retrieved_wr['id'], ))
791
792    def test_sub_broadcast_queue_update(self):
793        """Checks that queue.update signals are broadcasted."""
794
795        q = self.deejayd.get_queue()
796
797        ans = self.deejayd.get_audio_dir()
798        dir = self.testdata.getRandomElement(ans.get_directories())
799
800        self.generic_sub_bcast_test('queue.update', q.add_paths, ([dir], ))
801
802        retrieved_song_id = [song['id'] for song in q.get().get_medias()]
803        self.generic_sub_bcast_test('queue.update',
804                                    q.del_songs, (retrieved_song_id, ))
805
806    def test_sub_broadcast_dvd_update(self):
807        """Checks that dvd.update signals are broadcasted."""
808        if not self.video_support:
809            return True
810        self.generic_sub_bcast_test('dvd.update', self.deejayd.dvd_reload, ())
811
812    def test_sub_broadcast_mode(self):
813        """Checks that mode signals are broadcasted."""
814        self.generic_sub_bcast_test('mode', self.deejayd.set_mode, ('video', ))
815
816    def test_sub_broadcast_mediadb_aupdate(self):
817        """Checks that mediadb.aupdate signals are broadcasted."""
818
819        # This is tested only using inotify support
820        self.generic_sub_bcast_test('mediadb.aupdate',
821                                    self.test_audiodata.addMedia)
822
823    def test_sub_broadcast_mediadb_vupdate(self):
824        """Checks that mediadb.vupdate signals are broadcasted."""
825        if not self.video_support:
826            return True
827        # This is tested only using inotify support
828        self.generic_sub_bcast_test('mediadb.vupdate',
829                                    self.test_videodata.addMedia)
830
831
832# vim: ts=4 sw=4 expandtab
Note: See TracBrowser for help on using the browser.