/usr/share/popfile/POPFile/MQ.pm is in popfile 1.1.3+dfsg-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 | # POPFILE LOADABLE MODULE
package POPFile::MQ;
use POPFile::Module;
@ISA = ( "POPFile::Module" );
#----------------------------------------------------------------------------
#
# This module handles POPFile's message queue. Every POPFile::Module is
# able to register with the MQ for specific message types and can also
# send messages without having to know which modules need to receive
# its messages.
#
# Message delivery is asynchronous and guaranteed, as well as guaranteed
# first in, first out (FIFO) per process.
#
# The following public functions are defined:
#
# register() - register for a specific message type and pass an object
# reference. will call that object's deliver() method to
# deliver messages
#
# post() - send a message of a specific type
#
# The current list of types is
#
# UIREG Register a UI component, message is the component type
# and the element and reference to the
# object registering (comes from any component)
#
# TICKD Occurs when an hour has passed since the last TICKD (this
# is generated by the POPFile::Logger module)
#
# LOGIN Occurs when a proxy logs into a remote server, the message
# is the username sent
#
# COMIT Sent when an item is committed to the history through a call
# to POPFile::History::commit_slot
#
# RELSE Sent when a session key is being released by a client
#
# Copyright (c) 2001-2011 John Graham-Cumming
#
# This file is part of POPFile
#
# POPFile is free software; you can redistribute it and/or modify it
# under the terms of version 2 of the GNU General Public License as
# published by the Free Software Foundation.
#
# POPFile is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with POPFile; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
#----------------------------------------------------------------------------
use strict;
use warnings;
use locale;
use POSIX ":sys_wait_h";
#----------------------------------------------------------------------------
# new
#
# Class new() function
#----------------------------------------------------------------------------
sub new
{
my $type = shift;
my $self = POPFile::Module->new();
# These are the individual queues of message, indexed by type
# and written to by post().
$self->{queue__} = {};
# These are the registered objects for each type
$self->{waiters__} = {};
# List of file handles to read from active children, this
# maps the PID for each child to its associated pipe handle
$self->{children__} = {};
# Record the parent process ID so that we can tell when post is
# called whether we are in a child process or not
$self->{pid__} = $$;
bless $self, $type;
$self->name( 'mq' );
return $self;
}
#----------------------------------------------------------------------------
#
# service
#
# Called to handle pending tasks for the module. Here we flush all queues
#
#----------------------------------------------------------------------------
sub service
{
my ( $self ) = @_;
# See if any of the children have passed up messages through their
# pipes and deal with it now
for my $kid (keys %{$self->{children__}}) {
$self->flush_child_data_( $self->{children__}{$kid} );
}
# Iterate through all the messages in all the queues
for my $type (sort keys %{$self->{queue__}}) {
while ( my $ref = shift @{$self->{queue__}{$type}} ) {
my @message = @$ref;
my $flat = join(':', @message);
$self->log_( 2, "Message $type ($flat) ready for delivery" );
for my $waiter (@{$self->{waiters__}{$type}}) {
$self->log_( 2, "Delivering message $type ($flat) to " . # PROFILE BLOCK START
$waiter->name() ); # PROFILE BLOCK STOP
$waiter->deliver( $type, @message );
}
}
}
return 1;
}
#----------------------------------------------------------------------------
#
# stop
#
# Called when POPFile is closing down, this is the last method that
# will get called before the object is destroyed. There is not return
# value from stop().
#
#----------------------------------------------------------------------------
sub stop
{
my ( $self ) = @_;
# Call service() so that any remaining items are flushed and delivered
$self->service();
for my $kid (keys %{$self->{children__}}) {
close $self->{children__}{$kid};
delete $self->{children__}{$kid};
}
}
#----------------------------------------------------------------------------
#
# yield_
#
# Called by a child process to allow the parent to do work, this only
# does anything in the case where we didn't fork for the child process
#
#----------------------------------------------------------------------------
sub yield_
{
my ( $self, $pipe, $pid ) = @_;
if ( $pid != 0 ) {
$self->flush_child_data_( $pipe )
}
}
#----------------------------------------------------------------------------
#
# forked
#
# This is called when some module forks POPFile and is within the
# context of the child process so that this module can close any
# duplicated file handles that are not needed.
#
# $writer The writing end of a pipe that can be used to send up from
# the child
#
# There is no return value from this method
#
#----------------------------------------------------------------------------
sub forked
{
my ( $self, $writer ) = @_;
$self->{writer__} = $writer;
for my $kid (keys %{$self->{children__}}) {
close $self->{children__}{$kid};
delete $self->{children__}{$kid};
}
}
#----------------------------------------------------------------------------
#
# postfork
#
# This is called when some module has just forked POPFile. It is
# called in the parent process.
#
# $pid The process ID of the new child process
# $reader The reading end of a pipe that can be used to read messages
# from the child
#
# There is no return value from this method
#
#----------------------------------------------------------------------------
sub postfork
{
my ( $self, $pid, $reader ) = @_;
$self->{children__}{"$pid"} = $reader;
$self->log_( 2, "Parent: postfork() called for pid $pid, reader $reader" );
}
#----------------------------------------------------------------------------
#
# reaper
#
# Called when a child process terminates somewhere in POPFile. The
# object should check to see if it was one of its children and do any
# necessary processing by calling waitpid() on any child handles it
# has
#
# There is no return value from this method
#
#----------------------------------------------------------------------------
sub reaper
{
my ( $self ) = @_;
# Look for children that have completed and then flush the data
# from their associated pipe and see if any of our children have
# data ready to read from their pipes,
my @kids = keys %{$self->{children__}};
if ( $#kids >= 0 ) {
for my $kid (@kids) {
if ( waitpid( $kid, &WNOHANG ) == $kid ) {
$self->flush_child_data_( $self->{children__}{$kid} );
close $self->{children__}{$kid};
delete $self->{children__}{$kid};
$self->log_( 0, "Done with $kid (" . scalar(keys %{$self->{children__}}) . " to go)" );
}
}
}
}
#----------------------------------------------------------------------------
#
# read_pipe_
#
# reads a single message from a pipe in a cross-platform way.
# returns undef if the pipe has no message
#
# $handle The handle of the pipe to read
#
#----------------------------------------------------------------------------
sub read_pipe_
{
my ( $self, $handle ) = @_;
if ( $^O eq "MSWin32" ) {
# bypasses bug in -s $pipe under ActivePerl
my $message; # PROFILE PLATFORM START MSWin32
if ( &{ $self->{pipeready_} }($handle) ) {
# add data to the pipe cache whenever the pipe is ready
sysread($handle, my $string, -s $handle);
# push messages onto the end of our cache
$self->{pipe_cache__} .= $string;
}
# pop the oldest message;
$message = $1 if (defined($self->{pipe_cache__}) && # PROFILE BLOCK START
( $self->{pipe_cache__} =~ s/(.*?\n)// ) ); # PROFILE BLOCK STOP
return $message; # PROFILE PLATFORM STOP
} else {
# do things normally
if ( &{ $self->{pipeready_} }($handle) ) {
return <$handle>;
}
}
return undef;
}
#----------------------------------------------------------------------------
#
# flush_child_data_
#
# Called to flush data from the pipe of each child as we go, I did
# this because there appears to be a problem on Windows where the pipe
# gets a lot of read data in it and then causes the child not to be
# terminated even though we are done. Also this is nice because we
# deal with the messages on the fly
#
# $handle The handle of the child's pipe
#
#----------------------------------------------------------------------------
sub flush_child_data_
{
my ( $self, $handle ) = @_;
my $stats_changed = 0;
my $message;
while ( defined ( $message = $self->read_pipe_( $handle ) ) )
{
if ( $message =~ /([^:]+):([^\r\n]*)/ ) {
my @parameters = split( ':', $2 || '' );
$self->post( $1, @parameters );
} else {
$self->log_( 2, "Recieved invalid message from child: $message" );
}
}
}
#----------------------------------------------------------------------------
#
# register
#
# When a module wants to receive specific message types it calls this
# method with the type of message is wants to receive and the address
# of a callback function that will receive the messages
#
# $type A string identifying the message type
# $callback Reference to a function that takes three parameters
#
#----------------------------------------------------------------------------
sub register
{
my ( $self, $type, $callback ) = @_;
push @{$self->{waiters__}{$type}}, ( $callback );
}
#----------------------------------------------------------------------------
#
# post
#
# Called to send a message through the message queue
#
# $type A string identifying the message type
# @message The message (list of parameters)
#
#----------------------------------------------------------------------------
sub post
{
my ( $self, $type, @message ) = @_;
my $flat = join( ':', @message );
$self->log_( 2, "post $type ($flat)" );
# If we are in the parent process then just stick this on the queue,
# otherwise write it up the pipe.
if ( $$ == $self->{pid__} ) {
if ( exists( $self->{waiters__}{$type} ) ) {
$self->log_( 2, "queuing post $type ($flat)" );
push @{$self->{queue__}{$type}}, \@message;
$self->log_( 2, "$type queue length now " . $#{$self->{queue__}{$type}} );
} else {
$self->log_( 2, "dropping post $type ($flat)" );
}
} else {
my $pipe = $self->{writer__};
$self->log_( 2, "sending post $type ($flat) to parent $pipe" );
print $pipe "$type:$flat\n";
}
}
1;
|