URLS |
Perl 5.005 with threading..............................................CPAN MP3::Napster...............................................................CPAN MP3::Info.....................................................................CPAN Digest::MD5.................................................................CPAN Napster protocol...http://www.onelist.com/community/napdev/ |
Last time in my column I talked about creating a streaming audio server for MPEG level 3 (MP3) files. Continuing with the same theme, today I'm giving you a peek at my latest work in progress, a Perl module for the Napster protocol called MP3::Napster.
Napster (http://www.napster.com) is an online community of music aficionados that combines three major functions. First, it provides an IRC-like chat function in which members can subscribe to various channels (corresponding to musical genres) and exchange messages. Second, Napster provides file transfer services. Anyone who logs into a Napster server can publish a collection of MP3 files, making the files available for peer-to-peer transfer to any other user of the Napster service. Finally, Napster provides a search service for all these files, depicted in Figure 1. You can enter a few words from a song's title or the artist's name, and in a few seconds the server will list all the other users who are sharing a matching song. A click of the mouse later, and you can transfer this song to your local machine.
Example 1 will give you a better idea of what Napster is all about. This demonstrates a session using a small line-oriented Napster client that I wrote to demonstrate and test the MP3::Napster module. The session begins by prompting me for my username and password. This is followed by a server statistics message informing us that there are currently 155193 files available for download from 1106 users, and a message of the day which has been truncated for brevity.
As you can see, I issued the /search command for songs containing the word madonna. This returned 100 songs (the maximum), most of which are by the artist Madonna. (Again, the full list has been truncated for brevity.) The list shows the login name of the user who published the file, the streamrate of the MP3 file (where larger numbers give higher fidelity), the size of the file, the user's modem/link speed, and the title of the song. I issued the /play command to play song number 10 ("La Isla Bonita"). After a second the client began to download the song and stream it to an MP3 decoder. The messages you see following the /play command come from the command-line mpg123 program. After listening to about twenty seconds of the song, I aborted the download with /abort, and then issued a /download command to download a copy to my local disk. Downloads occur in the background; throughout the rest of the session you will see occasional messages like "[ Madonna-La_Isla_Bonita.mp3: 1002932 bytes ]" indicating the current status of the download.
While this was going on, I joined the Alternative channel using the /join command, and was greeted by a welcome message and a list of active users (truncated for space). The transcript now shows intermittent messages from several of these users, as well as server messages indicating when users arrive or depart the channel. At this point, anything I type other than commands were sent as public messages to the channel. Being shy, I didn't avail myself of this opportunity.
I now performed a new search for "bach", and got 79 results, mostly for J.S. Bach, but a few from other artists such as Tal Bachman. I initiated another download, and you can now see status messages from the Bach download intermixed with messages from the ongoing Madonna download. To confirm that both downloads were going on simultaneously, I issued the /status command. After another half minute or so, both downloads were complete and I logged out using /quit.
Apart from the ethical issues - the example session shows how easily the service invites MP3 piracy - Napster is a very interesting networking protocol. First of all, it's one of a rare breed of peer-to-peer protocols designed to be used across the Internet. Instead of having a step in which Napster users upload their MP3 files to a centralized server, all the file transfers occur directly between users' computers. The Napster server acts as a search engine, identifying which users own the desired song. When you go to download a song, your Napster client contacts the remote user's client and arranges a peer-to-peer transfer.
Another interesting aspect of Napster is its asynchronous nature. Most Internet networking protocols are highly synchronous. A client issues a request and then waits for a response from the server. Responses occur at regular and predictable intervals, and never arrive out of order. In contrast, the Napster protocol is highly asynchronous. After the initial login, the server can send messages to the client at any time, even while processing long-running requests such as searches. This accommodates the interactive IRC-style chat functions, and also allows multiple downloads to proceed in parallel.
These characteristics make writing a Napster client a bit challenging. Fortunately, the Napster protocol has been reverse engineered by a number of Open Source developers, and you can find a partial specification at http://www.onelist.com/community/napdev/. As a result there are already a number of freely available clients and servers for this protocol. I was able to develop the Perl client library by following the specification and reading the source code for clarification where necessary.
Because of the asynchronous nature of the protocol, MP3::Napster has to handle multiple threads of execution simultaneously. For example, it must be able to capture user chitchat while downloading song files. I could have implemented this functionality using multiprocessing or multiplexing (see "Client-Server Applications" in TPJ #15). However, multiprocessing requires the fork() system call, which is not available on non-Unix platforms, and multiplexing makes the code significantly more complex. Instead, I relied on the multithreading that first appeared in Perl 5.005. Although Perl threading is still experimental, it is more than stable enough to use in a non-critical application such as this one. To use MP3::Napster, you therefore need to run it under a version of Perl built with threading enabled, preferably a recent development version. I used version 5.005_63 to develop this module.
The MP3::Napster API
The MP3::Napster API is quite simple. First, you create a MP3::Napster object by calling its new() method:
$nap = MP3::Napster->new;
The new() method attempts to establish a connection with a Napster server. There are several servers, so new() first contacts a master server which then gives it the host name and port of the best server to use, where "best" is determined using load balancing so that a particular server isn't overwhelmed. If the connection fails, new() returns undef. Otherwise it returns an MP3::Napster object, which is then used for all interactions with the server.
The next step will then to be to log in as an existing user, or to register as a new user. login() has this form:
$email = $nap->login($nickname, $pass, $link_type);
$nickname and $pass are the user name and password. The $link_type is a constant indicating the speed of your network link, and can be selected from a set of constants ranging from LINK_14K through LINK_T3. If successful, the method returns an email address. According to the reverse-engineered specification, this should be the email address provided by the user at initial registration time, but the Napster servers now seem to return anon@napster.com for all users. If unsuccessful, login() returns undef and sets an error message which can be retrieved with error():
$message = $nap->error;For new users, the register() method will establish a new account with the server:
$result = $nap->register($nickname, $pass, $attributes);
$attributes is a hash reference that contains various optional fields that will be presented to other users when they invoke the whois command. Possible fields include name, address, age, and even income. If the registration is successful, the method attempts to log in, returning a true value. If registration is unsuccessful, the method returns undef and sets $nap->error to some error message, typically indicating that the desired nickname is already taken.
Once logged in, you can search for a song with the search() method, download a song with the download() method, and check on the status of ongoing downloads with the downloads() method. The search() method has both short and long forms. Its simplest form looks like this:
@songs = $nap->search('Joan Baez');
This will search for any song that has Joan Baez in the title (the current Napster server treats title and artist identically). All matching songs, up to a server limit of 100, are returned as an array of MP3::Napster::Song objects.
The longer form of search() looks like this:@songs = $nap->search(artist => $artist, title => $title, linespeed => $linespeed, bitrate => $bitrate, frequency => $freq, limit => $limit);
This lets you select up to $limit songs matching a set of criteria such as the line speed of the user offering the song, the bitrate and sampling frequency of the song, and the artist and title. Again, artist and title are not currently distinguished; this is provided for future compatibility.
MP3::Napster::Song objects have a number of attributes that you can access via methods. For example, you can learn the nickname of the song's owner with its owner() method and the size of the song file with size():
$song = $songs[0]; $size = $song->size; $owner = $song->owner;
To download a song, either call the Song object's download() method, or pass the song to the Napster object's method of the same name. Either method will return a MP3::Napster::Download object which you can then use to monitor the status of the download. Downloads proceed in a separate thread, so you can have multiple simultaneous downloads.
$download1 = $song->download; $download2 = $nap->download($songs[1]);
By default, the download will be placed in the current directory using a file of the same name as the song title. You can change this in one of two ways. First, you can tell the Napster object to use a different destination directory with its download_dir() method:
$nap->download_dir('/tmp/mp3s');
Alternatively, you can provide the download() method with a file path or a filehandle. With a path, the method creates the indicated file and copies the song into it. If you provide a filehandle, the song will be written to it. This allows you to open pipes to MP3 decoders for real-time playback:
open (PLAYER,"|/usr/local/bin/mpg123 -"); $download = $song->download(\*PLAYER);The Napster downloads() method returns a list of all the pending Download objects:
@pending_downloads = $nap->downloads;
The Download object has a number of methods that you can use to check the transmission status. Most useful are the bytes(), status(), and done() methods. bytes() returns the number of bytes that have been transferred, status() returns a human-readable string indicating the status of the transfer, and done() returns true if the transfer is complete. If you treat the Download object as a string, it will automatically be converted into a string containing the name of the owner and the song title. One way to print out intermittent status messages would be a polling loop like this one:
while (!$download->done) { print "$download: ", $download->status, "\n"; sleep 5; }
However, a better way to do this is to ask the Napster object to wait for TRANSFER_IN_PROGRESS events, which are issued periodically whenever a certain number of bytes are transferred (100K by default, settable using the Download object's interval() method):
$download->interval(200_000); # get an event every 200K while (!$download->done) { $nap->wait_for(TRANSFER_IN_PROGRESS); print "$download: ", $download->status, "\n"; }
You'll learn more about the wait_for() method when we discuss MP3::Napster's event-driven model.
At any time you can manually set done() to a true value to abort the transfer. Note that you'll have to remove the partially transferred file manually.
$download->done(1);
Other methods provide access to Napster's chat functions. The channels() method will return a list of MP3::Napster::Channel objects, any of which you can join().
@channels = $nap->channels; $channels[4]->join;
You can also call the Napster object's own join_channel() method in order to join a channel you already know about:
$nap->join_channel('Funk');
Once you have joined a channel, you can send a public message with public_message():
$nap->public_message("hi guys, does anyone here do $]?");
You can obtain a list of logged-in users with users(). This returns an array of MP3::Napster::User objects, which has information about the user's active status, link speed, and number of uploads and downloads:
@users = $nap->users(); foreach my $u (@users) { print "$u: has been logged on ", $u->login_time, "\n"; }
The ability to send private messages to users has not yet been implemented, but this will surely be part of the API by the time you read this.
The whois() method provides detailed information on a particular user, and ping() returns true if the user's client is reachable. You might want to ping a song's owner before trying to download it:
$song->download if $song->owner->ping;
The disconnect() method politely severs the connection to the Napster server. You must call this method before exiting your script, or one or more threads will continue to run indefinitely. The best way to ensure that the threads are terminated is to include an END { } block like this one somewhere in your script:
END { $nap->disconnect }
As the preceding section shows, it's possible to write a fully-automated Perl script that connects to a Napster server, searches for new songs by your favorite artist, and downloads them. However, in order to deal with Napster's IRC-like chat facility, MP3::Napster has to be able to deal with asynchronous events such as a user posting a message. The MP3::Napster event model takes care of this.
Anything that happens while connected to a Napster server - a user posting a message, a user entering or leaving a channel, a download initiating - generates an event. Most events are generated directly by the Napster server, but a few are synthesized internally by MP3::Napster itself. There are a few dozen such events, each of which has a numeric constant exported by the module. For example, a SERVER_STATS event is sent whenever the Napster server decides to send the client its connection statistics, something that happens randomly every few minutes. The MOTD event is sent once after login when the server announces its message of the day. The USER_JOINS and USER_DEPARTS events are generated whenever a user joins or departs an active channel.
To act on a particular event, you can install a callback function using the Napster object's callback() method. Thereafter, whenever an event occurs, your function will be notified immediately. Callbacks are passed two arguments, the Napster object and an event-specific message. In most cases, the messages are strings received directly from the Napster server. For instance, when a user joins a channel, the message consists of the string <channel> <user> <sharing> <link-type>, which can be decoded with a simple pattern match. In other cases, MP3::Napster performs some preprocessing on the message. As an example, the SEARCH_RESPONSE event, which returns one item in the list of songs produced in response to a search, returns a MP3::Napster::Song object as the message. The current event code can be recovered from the Napster object using its event_code() method, and the name of the event with event().
For example, this code fragment installs a callback routine for the PUBLIC_MESSAGE_RECVD message. The message in this case is a string containing the channel name, the nickname of the user sending the message, and the message the user sent, all separated by spaces. The code creates an anonymous subroutine that processes and acts on the message. It then installs this subroutine using the Napster object's callback() method:
$mysub = sub { my ($napster,$message) = @_; my ($channel,$user,$msg) = $message =~ /^(\S+) (\S+) (.*)/; print "[$channel] $user says: $msg\n"; }; $nap->callback($mysub);
There are three events involved in downloads: TRANSFER_STARTED, sent just after the peer-to-peer transfer is initiated; TRANSFER_IN_PROGRESS, sent at user-controllable intervals during the transfer; and TRANSFER_DONE, sent after the transfer completes (whether successfully or prematurely). TRANSFER_IN_PROGRESS is useful for printing out intermittent status messages, while TRANSFER_DONE can be used to detect incomplete downloads and remove the leftover files. Here is how it's often used:
$nap->callback(TRANSFER_DONE, sub { my ($nap, $download) = @_; my $path = $download->local_path; unlink $path if $path && $download->status ne 'download complete'; });
The TRANSFER_DONE callback subroutine gets the Napster and Download objects. It recovers the path to the local file by calling the local_path() method. If the path exists and the status() method does not return "download complete", the path is unlinked.
Sometimes it's more natural to wait for a particular event to occur rather than install a callback. The wait_for() method is designed for this purpose. Pass wait_for() a single event code or an anonymous list of such codes, and optionally a timeout in seconds. It will put the current thread to sleep until one of the listed events occurs or the timeout expires. In the former case, wait_for() returns a two-element list consisting of the result code and a message (event-dependent). In the case of a timeout, wait_for() returns an empty list.
This facility provides a simple way to wait for all pending downloads to complete before your script exits. In the following example, we assume that @songs contains a list of songs you desire to download.
# initiate downloads foreach (@songs) { $_->download } # wait for them to finish while (@d = $nap->downloads) { warn "waiting for ", scalar(@d), " downloads to finish...\n"; # sleep until one is done ($event, $download) = $nap->wait_for(TRANSFER_DONE); warn "$download is done...\n"; }
shows the code for the simple interactive client I used to test and debug the MP3::Napster module. We'll go over it lightly; you can fill in the details from the description of the API I gave above.
The script begins by creating a new MP3::Napster object and declaring an END{} block to call the disconnect() method before the script exits. If need be, it creates a directory to hold downloaded songs and points the Napster object at it by calling the download_dir() method.
Now the script sets up the callbacks. The setup_callbacks() subroutine is lengthy but straightforward. Callbacks are installed for three messages: PUBLIC_MESSAGE_RECVD, USER_JOINS, and USER_DEPARTS, all of which support the chat facility. The CHANNEL_ENTRY event occurs when the server returns a list of channels that the user can join, while the CHANNEL_TOPIC event carries a welcome message sent when a user first joins a channel. More callbacks are installed for SEARCH_RESPONSE, sent by the server every time it returns a song in response to a search query, as well as for SERVER_STATS and MOTD, the message of the day. In addition, the script installs callbacks for TRANSFER_IN_PROGRESS and TRANSFER_DONE. By and large, the callbacks simply print out status messages so that the user knows what's going on.
After installing the callbacks, the client attempts to log in. The login() routine prompts the user for his name and password. New registrations are not yet supported by the client, but currently the server accepts any unused username/password combination.
If the login is successful, the script enters a loop in which it accepts lines of input from standard input. If the input begins with a slash command (/join, for example), it passes the command to the appropriate subroutine. Otherwise it treats the line as a public message, and attempts to send it to the current channel.
The individual commands are simple front ends to the MP3::Napster API methods. For example, the /search command invokes a subroutine named search(), which in turn passes the rest of the command line to MP3::Napster->search(). The returned Song objects, if any, are placed in a global array named @SEARCH. The /download command invokes download(), which splits the rest of the command line into a set of numeric arguments. These arguments are then used to index into the @SEARCH array and call each selected Song object's download() method.
A few words about the design of the MP3::Napster module itself. Because it operates in a multithreaded environment, I had to take special care to make it thread-safe. This involves the following general precautions:
Avoid updating global variablesThe problem with threads is that a context switch can occur between one thread and another without warning. If one thread is in the midst of working with a global variable, and another thread changes the global's value, very confusing results will ensue. Using object data (e.g. the contents of a blessed hash) limits the havoc that a thread context switch can wreak because it is easier to control access to an object's data, and there is often a one-to-one correspondence between an object and a thread.
To further ensure stability, critical methods can be marked as locked. An example is the MP3::Napster download_dir() method, which gets and sets the directory in which downloaded songs are stored:
sub download_dir { use attrs qw(locked method); my $self = shift; return defined $_[0] ? $self->{download_dir} = $_[0] : $self->{download_dir}; }
After recovering the object reference from the subroutine argument list, the method looks at what's left. If there is a defined value, it is used to change the value of the download_dir key. Otherwise, the current value of download_dir is returned. The section relevant to threading is the use attrs line. On entry into the method, Perl tries to lock the object. It retains the lock until the method has finished executing, preventing other threads from obtaining the value of download_dir until the update has completed.
All methods that alter MP3::Napster's state variables are locked in this way.
Another issue in designing MP3::Napster was dealing with the asynchronous and unpredictable nature of messages coming from the Napster server. In order to handle this, the module launches a separate thread which does nothing but listen for incoming messages and pass them to callbacks for processing. The interesting action begins with the connect() method, which is called during the Napster initialization procedure:
sub connect { use attrs qw(locked method); my $self = shift; return $self->error('No server address defined') unless $self->server; my $sock = IO::Socket::INET->new($self->server); return $self->error("Could not connect to napster server: $!") unless $sock; $self->install_default_callbacks; return unless $self->{receive_tid} = Thread->new(\&receive_loop,$self); $self->{receive_tid}->detach; return $self->socket($sock); }
The connect() method runs as a locked method. It recovers the address of a Napster server from the server() method (which I won't show here), and attempts to connect a TCP/IP socket to the server using Graham Barr's IO::Socket::INET module. If successful, connect() installs a few default callback routines, and then calls Thread->new() to launch a new thread running the receive_loop() method. If successful, the new thread is made independent of the main thread by calling its detach() method.
The receive_loop() method runs in the new thread. It's extremely simple:sub receive_loop { my $self = shift; while (my($rc,$message) = $self->recv) { $self->process_message($rc, $message); } }
Basically, receive_loop() calls the recv() method repeatedly to retrieve an event code and message from the server. These values are then passed to another method, named process_message(), for processing. The recv() method is also straightforward:
sub recv { my $self = shift; my $sock = $self->socket; # read four bytes from the input stream my $data; my $bytes = read($sock, $data, 4); return unless $bytes; # unpack it into length and type my ($length,$event) = unpack("vv", $data); # read the rest of the data if ($length > 0) { return unless read($sock,$data,$length); return ($event,$data); } return $event; }
The events sent by the Napster server consist of a two-byte message length, a two-byte event code, and a variable-length message. recv() first reads four bytes of data from the socket and unpacks the data into the length and event code. If the message length is greater than zero, recv() performs a second read to get the message data. This information is then returned to the caller.
Once this is all running, the client has two main threads of execution. One is the original thread used to send command data to the Napster server, and the other is the event thread that runs the receive_loop(). Other threads are launched as necessary to handle the peer-to-peer transfer and for other specialized tasks.
Although the two threads are essentially independent, they do need to be synchronized from time to time. A typical scenario occurs when the command thread sends out a search request and the event thread receives and processes the result. The command thread wants to clear the previous search results if any, issue the search command, and then wait for the event thread to return the new search results. But how can the command thread do this when it has no link to the event thread?
The key is to use a locked variable and a combination of the cond_wait() and cond_signal() functions, both of which are part of Perl's standard Thread package.
Here's how it works. One thread selects a pre-agreed on variable called a condition variable and locks it. The second thread continues to run until it reaches a synchronization point, at which point it tries to lock the same variable. However, the first thread already has the lock on this variable, so the second thread stalls. The first thread is then free to do whatever setup it likes, such as clearing state variables and sending queries to the server. When it has finished its setup it calls cond_wait() on the locked variable. This call atomically puts the first thread to sleep and unlocks the variable.
Now that the variable is unlocked, the second thread wakes up and acquires the lock. It does whatever processing it needs to (such as parsing the incoming search results). The second thread then calls cond_signal() on the locked variable. This wakes up the first thread, which reacquires the lock. The first thread is now free to act on the information processed by the second thread while the first one was asleep. The second thread, meanwhile, again stalls at the synchronization point until the first thread again calls cond_wait() or simply gives up the lock for good.
The wait_for() and process_message() methods illustrate how this works. wait_for() is called by the command thread whenever it needs to stop and wait for a particular event or set of events. A simplified version of the subroutine is shown below:
sub wait_for { my $self = shift; my $ec = shift; my %ok = (ref $ec eq 'ARRAY') ? map {$_=>1} @$ec : ($rc=>1); lock $self->{ec}; $self->ec(''); foreach (keys %ok) { $self->message($_,'') } while (1) { cond_wait $self->{ec}; last if $ok{$self->ec}; } return wantarray ? ($self->ec,$self->message($self->ec)) : $self->ec; }
The wait_for() method accepts either a single event code or an array of such codes, and stores them in a variable named $ec (for "event code"). If $ec is a scalar it is stored as a key into the %ok hash. Otherwise it is dereferenced and all the desired event codes are stored into this hash.
The condition variable in this case is a key in the Napster object hash named ec. In addition to being a condition variable, it is also used by the module to store the last event code received. wait_for() locks the condition variable, thereby preventing the event thread from changing it until it is unlocked. The method now clears the contents of ec by calling the ec() accessor to set it to an empty string, and furthermore clears the messages associated with any of the desired events, by calling the message() accessor.
The method now enters a while() loop. It first calls cond_wait() in order to release the lock on the condition variable and allow other threads (and in particular the event thread) to access it. The method then goes to sleep until another thread calls cond_signal() on the variable. When it wakes up, wait_for() checks what's in ec by calling the ec() method. If this is one of the desired events, the loop terminates. Otherwise it goes back to waiting.
At the end of the subroutine, the method returns the event code and event message in an array context, and the returned event code only in a scalar context.
The actual subroutine differs from this by having an additional half-dozen lines of code that deals with timeouts. If a timeout is requested, the method launches a new thread that sleeps for the indicated period of time and then locks ec, sets its value to TIMEOUT, and calls cond_signal. When wait_for() wakes up, it sees that ec() contains TIMEOUT and returns an undefined value.
The process_message() method runs within the event thread. Here is a slightly simplified version which excludes some debugging code:
sub process_message { my $self = shift; my ($ec, $message) = @_; # wait until someone has unlocked {ec} lock $self->{ec}; $self->ec($ec); $self->error($message) if $ERRORS{$ec}; # transform some messages $message = $MESSAGE_CONSTRUCTOR{$ec}->new($self, $message) if $MESSAGE_CONSTRUCTOR{$ec}; if ($MULTILINE_CODE{$ec}) { lock $self; push (@{$self->{messages}{$ec}}, $message); } else { $self->message($ec, $message); } $self->callback($ec)->($self, $message) if $self->callback($ec); cond_signal $self->{ec}; }
The method begins by copying the event code and associated message from the subroutine array. It then acquires a lock on ec, possibly synchronizing itself on the wait_for() method's cond_wait(). It uses the ec() accessor to remember the current event code, and also checks a global %ERRORS hash containing a list of event codes that signify error conditions. If appropriate, the method remembers the error message by calling the error() accessor.
Some event messages are to be treated as plain strings, but others are transformed into specialized objects (such as Song and User objects). The next two lines of code check a list of special event codes and invoke the new() constructor to transform any messages falling into this category. Similarly, some events are multivalued (such as search results), while others are single valued (such as the current server statistics). The method checks the %MULTILINE_CODE hash, and if the event code falls into this category its message is pushed onto an array of messages associated with this event. Otherwise the message() accessor is used to replace the current event message.
process_message() now invokes the callback for the event, if any. Consequently the callback subroutine will be executed within the event thread rather than the command thread. Finally, the method invokes cond_signal() to wake up any threads waiting for an update to ec.
As I said at the beginning of this article, MP3::Napster is still a work in progress. Not all aspects of the Napster protocol are implemented. In particular, the module can't yet handle incoming connections (uploads). This will involve creating yet another thread to listen for and process incoming connections. The ability to send private messages to other users on this system is not implemented, as well as a number of functions that require administrative access, such as the ability to boot a user off the server. Most of these functions will be available by the time you read this article.
The command-line client is extremely primitive. Because it uses no screen control, your typing can be interrupted by asynchronous messages from the server and other users. I get around this by running the client in an Emacs shell window, but clearly this isn't for everyone. It would be a nice exercise to convert the client into a curses-based application for terminal windows, or perhaps a Tk or GTk graphical application. Any takers?
_ _END_ _