Imported Upstream version 2.5.11
[libapache-mod-security.git] / apache2 / mlogc-src / mlogc.c
1 /*
2  * ModSecurity for Apache 2.x, http://www.modsecurity.org/
3  * Copyright (c) 2004-2009 Breach Security, Inc. (http://www.breach.com/)
4  *
5  * This product is released under the terms of the General Public Licence,
6  * version 2 (GPLv2). Please refer to the file LICENSE (included with this
7  * distribution) which contains the complete text of the licence.
8  *
9  * There are special exceptions to the terms and conditions of the GPL
10  * as it is applied to this software. View the full text of the exception in
11  * file MODSECURITY_LICENSING_EXCEPTION in the directory of this software
12  * distribution.
13  *
14  * If any of the files related to licensing are missing or if you have any
15  * other questions related to licensing please contact Breach Security, Inc.
16  * directly using the email address support@breach.com.
17  *
18  */
19
20 #include <apr.h>
21 #include <apr_errno.h>
22 #include <apr_general.h>
23 #include <apr_file_io.h>
24 #include <apr_file_info.h>
25 #include <apr_hash.h>
26 #include <apr_lib.h>
27 #include <apr_strings.h>
28 #include <apr_signal.h>
29 #include <apr_thread_proc.h>
30 #include <apr_global_mutex.h>
31 #include <apr_getopt.h>
32 #include <apr_version.h>
33 #if APR_HAVE_UNISTD_H
34 #include <unistd.h>         /* for getpid() */
35 #endif
36 #include <pcre.h>
37 #include <curl/curl.h>
38 #include <fcntl.h>
39 #include <sys/stat.h>
40 #include <sys/types.h>
41
42 #include "msc_release.h"
43
44 static void logc_shutdown(int rc);
45 static void create_new_worker(int lock);
46 static void error_log(int level, void *thread, const char *text, ...) PRINTF_ATTRIBUTE(3,4);
47
48
49 /* -- Constants -- */
50
51 /* Error log levels. */
52 #define LOG_ERROR           1
53 #define LOG_WARNING         2
54 #define LOG_NOTICE          3
55 #define LOG_DEBUG           4
56 #define LOG_DEBUG2          5
57
58 /* The management thread will wake up every five seconds. */
59 #define MANAGER_SLEEP       5000000
60 #define MANAGER_SUBSLEEP    10000
61
62 /* Hack to allow multiple mlogc with single delete */
63 #define KEEP_ENTRIES_REMOVE_HACK   2600
64 #define KEEP_ENTRIES_REMOVE_TIME   0l
65 #ifdef TEST_HACK
66 #define TEST_WITH_RAND_SLEEP(n) \
67 do { \
68     int sec = rand()/(RAND_MAX/n); \
69     error_log(LOG_DEBUG2, NULL, "TEST_HACK: Sleeping for %ds", sec); \
70     apr_sleep(apr_time_from_sec(sec)); \
71 } while(0)
72 #else
73 #define TEST_WITH_RAND_SLEEP(n)
74 #endif
75
76 #define CAPTUREVECTORSIZE   60
77 #define PIPE_BUF_SIZE       65536
78 #define MEMALLOC_ERROR_MSG  "Memory allocation failed!"
79 #define VERSION             MODSEC_VERSION
80
81 #define CMDLINE_OPTS        "fvh"
82
83 #define TXIN            0
84 #define TXOUT           1
85
86 #define STATUSBUF_SIZE      256
87
88 #define ISHEXCHAR(X) (((X >= '0')&&(X <= '9')) || ((X >= 'a')&&(X <= 'f')) || ((X >= 'A')&&(X <= 'F')))
89
90 /* -- Regex Patterns -- */
91
92 /**
93  * This regular expression is used to parse the entire
94  * log line we receive from Apache. The REQUEST_LINE is
95  * treated as a single parameter to allow for invalid
96  * requests.
97  */
98 const char logline_pattern[] =
99     "^(\\S+)"
100     "\\ (\\S+)\\ (\\S+)\\ (\\S+)"
101     "\\ \\[([^:]+):(\\d+:\\d+:\\d+)\\ ([^\\]]+)\\]"
102     "\\ \"(.*)\""
103     "\\ (\\d+)\\ (\\S+)"
104     "\\ \"(.*)\"\\ \"(.*)\""
105     "\\ (\\S+)\\ \"(.*)\""
106     "\\ /?(\\S+)\\ (\\d+)\\ (\\d+)"
107     "\\ (\\S+)"
108     "(.*)$";
109
110
111 /**
112  * This regular expression can be used to parse
113  * a REQUEST_LINE field into method, URI, and
114  * protocol.
115  */
116 const char requestline_pattern[] =
117     "(\\S+)\\ (.*?)\\ (\\S+)";
118
119
120 /* -- Structures -- */
121
122 typedef struct {
123     unsigned long int        id;
124     const char              *line;
125     apr_size_t               line_size;
126 } entry_t;
127
128
129 /* -- Global variables -- */
130
131 pid_t                        logc_pid = 0;
132 const char                  *conffile = NULL;
133 const char                  *lockfile = NULL;
134 int                          have_read_data = 0;
135 int                          checkpoint_interval = 60;
136 apr_time_t                   checkpoint_time_last = 0;
137 const char                  *collector_root = NULL;
138 apr_table_t                 *conf = NULL;
139 const char                  *console_uri = NULL;
140 apr_array_header_t          *curl_handles = NULL;
141 int                          current_workers = 0;
142 int                          management_thread_active = 0;
143 unsigned long int            entry_counter = 1;
144 const char                  *error_log_path = NULL;
145 apr_file_t                  *error_log_fd = NULL;
146 int                          error_log_level = 2;
147 apr_hash_t                  *in_progress = NULL;
148 int                          keep_alive = 150;               /* Not used yet. */
149 int                          keep_alive_timeout = 300;       /* Not used yet. */
150 int                          keep_entries = 0;
151 const char                  *log_repository = NULL;
152 void                        *logline_regex = NULL;
153 int                          max_connections = 10;
154 int                          max_worker_requests = 1000;
155 apr_global_mutex_t          *gmutex = NULL;
156 apr_thread_mutex_t          *mutex = NULL;
157 apr_pool_t                  *pool = NULL;
158 apr_pool_t                  *thread_pool = NULL;
159 apr_pool_t                  *recv_pool = NULL;
160 apr_array_header_t          *queue = NULL;
161 const char                  *queue_path = NULL;
162 /* apr_time_t                   queue_time = 0; */
163 void                        *requestline_regex = NULL;
164 int                          running = 0;
165 const char                  *sensor_password = NULL;
166 const char                  *sensor_username = NULL;
167 int                          server_error = 0;
168 apr_time_t                   server_error_last_check_time = 0;
169 int                          server_error_timeout = 60;
170 int                          startup_delay = 100;
171 int                          transaction_delay = 100;
172 const char                  *transaction_log_path = NULL;
173 apr_file_t                  *transaction_log_fd = NULL;
174
175
176 /* -- Commandline opts -- */
177 int                          opt_force = 0;
178
179
180 /* -- Code -- */
181
182 static char *_log_escape(apr_pool_t *mp, const char *input, apr_size_t input_len)
183 {
184     static const char c2x_table[] = "0123456789abcdef";
185     unsigned char *d = NULL;
186     char *ret = NULL;
187     unsigned long int i;
188
189     if (input == NULL) return NULL;
190
191     ret = apr_palloc(mp, input_len * 4 + 1);
192     if (ret == NULL) return NULL;
193     d = (unsigned char *)ret;
194
195     i = 0;
196     while(i < input_len) {
197         switch(input[i]) {
198             case '"' :
199                 *d++ = '\\';
200                 *d++ = '"';
201                 break;
202             case '\b' :
203                 *d++ = '\\';
204                 *d++ = 'b';
205                 break;
206             case '\n' :
207                 *d++ = '\\';
208                 *d++ = 'n';
209                 break;
210             case '\r' :
211                 *d++ = '\\';
212                 *d++ = 'r';
213                 break;
214             case '\t' :
215                 *d++ = '\\';
216                 *d++ = 't';
217                 break;
218             case '\v' :
219                 *d++ = '\\';
220                 *d++ = 'v';
221                 break;
222             case '\\' :
223                 *d++ = '\\';
224                 *d++ = '\\';
225                 break;
226             default :
227                 if ((input[i] <= 0x1f)||(input[i] >= 0x7f)) {
228                     *d++ = '\\';
229                     *d++ = 'x';
230                     *d++ = c2x_table[input[i] >> 4];
231                     *d++ = c2x_table[input[i] & 0x0f];
232                 } else {
233                     *d++ = input[i];
234                 }
235                 break;
236         }
237
238         i++;
239     }
240
241     *d = 0;
242
243     return ret;
244 }
245
246 /**
247  * Converts a byte given as its hexadecimal representation
248  * into a proper byte. Handles uppercase and lowercase letters
249  * but does not check for overflows.
250  */
251 static unsigned char x2c(unsigned char *what) {
252     register unsigned char digit;
253
254     digit = (what[0] >= 'A' ? ((what[0] & 0xdf) - 'A') + 10 : (what[0] - '0'));
255     digit *= 16;
256     digit += (what[1] >= 'A' ? ((what[1] & 0xdf) - 'A') + 10 : (what[1] - '0'));
257
258     return digit;
259 }
260
261 /**
262  * URL Decodes a string in-place
263  */
264 static int urldecode_inplace(unsigned char *input, apr_size_t input_len) {
265     unsigned char *d = (unsigned char *)input;
266     apr_size_t i;
267
268     if (input == NULL) return 0;
269
270     i = 0;
271     while (i < input_len) {
272         if (input[i] == '%') {
273             /* Character is a percent sign. */
274
275             /* Are there enough bytes available? */
276             if (i + 2 < input_len) {
277                 char c1 = input[i + 1];
278                 char c2 = input[i + 2];
279
280                 if (ISHEXCHAR(c1) && ISHEXCHAR(c2)) {
281                     /* Valid encoding - decode it. */
282                     *d++ = x2c(&input[i + 1]);
283                     i += 3;
284                 } else {
285                     /* Not a valid encoding, skip this % */
286                     *d++ = input[i++];
287                 }
288             } else {
289                 /* Not enough bytes available, copy the raw bytes. */
290                 *d++ = input[i++];
291             }
292         } else {
293             /* Character is not a percent sign. */
294             if (input[i] == '+') {
295                 *d++ = ' ';
296             } else {
297                 *d++ = input[i];
298             }
299             i++;
300         }
301     }
302
303     *d = '\0';
304
305     return 1;
306 }
307
308 /**
309  * Detect a relative path and merge it with the collector root
310  * path. Leave absolute paths as they are.
311  */
312 static const char *file_path(const char *path)
313 {
314     char *newpath = NULL;
315     apr_status_t rc;
316
317     if (path == NULL) return NULL;
318
319     rc = apr_filepath_merge(&newpath, collector_root, path, APR_FILEPATH_TRUENAME, pool);
320     if ((newpath != NULL) && (rc == APR_SUCCESS || APR_STATUS_IS_EPATHWILD(rc)
321         || APR_STATUS_IS_ENOENT(rc) || APR_STATUS_IS_ENOTDIR(rc)))
322     {
323         return newpath;
324     }
325     else {
326         return NULL;
327     }
328 }
329
330
331 /**
332  * Returns the current datetime as a string.
333  */
334 static char *current_logtime(char *dest, int dlen)
335 {
336     apr_time_exp_t t;
337     apr_size_t len;
338
339     apr_time_exp_lt(&t, apr_time_now());
340     apr_strftime(dest, &len, dlen, "%a %b %d %H:%M:%S %Y", &t);
341
342     return dest;
343 }
344
345
346 /**
347  * Logs error to the error log (if available) or
348  * to the stderr.
349  */
350 static void error_log(int level, void *thread, const char *text, ...)
351 {
352     char msg1[4096] = "";
353     char msg2[4096] = "";
354     char datetime[100];
355     va_list ap;
356
357     if (level > error_log_level) return;
358
359     va_start(ap, text);
360
361     apr_vsnprintf(msg1, sizeof(msg1), text, ap);
362     apr_snprintf(msg2, sizeof(msg2), "[%s] [%d] [%" APR_PID_T_FMT "/%pp] %s\n", current_logtime(datetime, sizeof(datetime)), level, logc_pid, (thread ? thread : 0), msg1);
363
364     if (error_log_fd != NULL) {
365         apr_size_t nbytes_written;
366         apr_size_t nbytes = strlen(msg2);
367         apr_file_write_full(error_log_fd, msg2, nbytes, &nbytes_written);
368     }
369     else {
370         fprintf(stderr, "%s", msg2);
371     }
372
373     va_end(ap);
374 }
375
376
377 /**
378  * Adds one entry to the internal queue. It will (optionally) start
379  * a new thread to handle it.
380  */
381 static void add_entry(const char *data, int start_worker)
382 {
383     entry_t *entry = NULL;
384
385     entry = (entry_t *)malloc(sizeof(entry_t));
386     entry->id = 0;
387     entry->line = strdup(data);
388     entry->line_size = strlen(entry->line);
389
390     error_log(LOG_DEBUG, NULL, "Queue locking thread mutex.");
391     if (APR_STATUS_IS_EBUSY(apr_thread_mutex_trylock(mutex))) {
392         error_log(LOG_DEBUG, NULL, "Queue waiting on thread mutex.");
393         apr_thread_mutex_lock(mutex);
394     }
395
396     /* Assign unique ID to this log entry. */
397     entry->id = entry_counter++;
398
399     /* Add the new audit log entry to the queue. */
400     *(entry_t **)apr_array_push(queue) = entry;
401
402     /* Create a new worker if we can, but not if there is a known problem with the server. */
403     if ((start_worker != 0)&&(current_workers < max_connections)&&(server_error == 0)) {
404         create_new_worker(0);
405     }
406
407     error_log(LOG_DEBUG, NULL, "Queue unlocking thread mutex.");
408     apr_thread_mutex_unlock(mutex);
409 }
410
411
412 /**
413  * Read the queue entries.
414  */
415 static int read_queue_entries(apr_file_t *fd, apr_time_t *queue_time)
416 {
417     char linebuf[4100];
418     int line_count = -1;
419
420     for(;;) {
421         apr_status_t rc = apr_file_gets(linebuf, 4096, fd);
422         char *p;
423
424         if (rc == APR_EOF) break;
425         if (rc != APR_SUCCESS) {
426             error_log(LOG_ERROR, NULL, "Error reading from the queue file.");
427             logc_shutdown(1);
428         }
429
430         if (line_count < 0) {
431             /* First line contains the queue time. */
432             *queue_time = (apr_time_t)apr_atoi64(linebuf);
433             line_count = 0;
434             continue;
435         }
436
437         p = &linebuf[0];
438
439         /* Remove the \n from the end of the line. */
440         while(*p != '\0') {
441             if (*p == '\n') {
442                 *p = '\0';
443                 break;
444             }
445             p++;
446         }
447
448         if (linebuf[0] == '#') { /* Ignore comments. */
449             continue;
450         }
451
452         add_entry((const char *)&linebuf, 0);
453
454         line_count++;
455     }
456
457     apr_file_close(fd);
458
459     return line_count;
460 }
461
462
463 /**
464  * Initialise the transaction log. This code should be
465  * executed only once at startup.
466  */
467 static void transaction_log_init(void)
468 {
469     /* ENH: These big enough? */
470     char new_queue_path[256];
471     char old_queue_path[256];
472     apr_file_t *queue_fd = NULL;
473     apr_time_t queue_time;
474
475     apr_snprintf(new_queue_path, sizeof(new_queue_path), "%s.new", queue_path);
476     apr_snprintf(old_queue_path, sizeof(old_queue_path), "%s.old", queue_path);
477
478     /* Put a lock in place to ensure exclusivity. */
479     error_log(LOG_DEBUG, NULL, "Transaction initialization locking global mutex.");
480     if (APR_STATUS_IS_EBUSY(apr_global_mutex_trylock(gmutex))) {
481         error_log(LOG_DEBUG, NULL, "Transaction initialization waiting on global mutex.");
482         apr_global_mutex_lock(gmutex);
483     }
484
485     error_log(LOG_DEBUG, NULL, "Transaction initialization started.");
486
487     /* Delete .new file if there is one. */
488     apr_file_remove(new_queue_path, pool);
489
490     /* Read in the data from the queue. */
491     if (apr_file_open(&queue_fd, queue_path, APR_READ | APR_FILE_NOCLEANUP,
492         0, pool) == APR_SUCCESS)
493     {
494         int line_count = read_queue_entries(queue_fd, &queue_time);
495
496         apr_file_close(queue_fd);
497
498         if (line_count > 0) {
499             error_log(LOG_NOTICE, NULL, "Loaded %d entries from the queue file.", line_count);
500         }
501     }
502     /* Try the old queue file. */
503     else if (apr_file_open(&queue_fd, old_queue_path, APR_READ | APR_FILE_NOCLEANUP,
504         0, pool) == APR_SUCCESS)
505     {
506         int line_count = read_queue_entries(queue_fd, &queue_time);
507         apr_file_close(queue_fd);
508         error_log(LOG_NOTICE, NULL, "Loaded %d entries from the OLD queue file.", line_count);
509         apr_file_rename(old_queue_path, queue_path, pool);
510     }
511     else {
512         error_log(LOG_NOTICE, NULL, "Queue file not found. New one will be created.");
513     }
514
515     /* Delete the old queue file. */
516     apr_file_remove(old_queue_path, pool);
517
518     checkpoint_time_last = apr_time_now();
519
520     /* Start fresh with the transaction log. Do note that
521      * we do not truncate the transaction log on purpose. Apache
522      * will start copies of piped logging binaries during configuration
523      * testing. Truncating would erase the log of a currently running
524      * instance.
525      */
526     if (apr_file_open(&transaction_log_fd, transaction_log_path, APR_WRITE | APR_CREATE
527         | APR_APPEND | APR_XTHREAD, APR_OS_DEFAULT, pool) != APR_SUCCESS)
528     {
529         error_log(LOG_ERROR, NULL, "Failed to open the transaction log: %s\n", transaction_log_path);
530         error_log(LOG_DEBUG, NULL, "Transaction initialization unlocking global mutex.");
531         apr_global_mutex_unlock(gmutex);
532         logc_shutdown(1);
533     }
534
535     error_log(LOG_DEBUG, NULL, "Transaction initialization completed.");
536
537     /* Unlock */
538     error_log(LOG_DEBUG, NULL, "Transaction initialization unlocking global mutex.");
539     apr_global_mutex_unlock(gmutex);
540 }
541
542
543 /**
544  * Log entry event (incoming or outgoing) to the transaction log.
545  */
546 static void transaction_log(int direction, const char *entry)
547 {
548     apr_size_t nbytes, nbytes_written;
549     char msg[8196] = "";
550
551     apr_snprintf(msg, sizeof(msg), "%u %s: %s\n", (unsigned int)apr_time_sec(apr_time_now()),
552         (direction == TXIN ? "IN" : "OUT"), entry);
553     nbytes = strlen(msg);
554     apr_file_write_full(transaction_log_fd, msg, nbytes, &nbytes_written);
555 }
556
557
558 /**
559  * Executes a checkpoint, which causes the current queue to be
560  * written to a file and the transaction log to be truncated.
561  */
562 static void transaction_checkpoint(void)
563 {
564     /* ENH: These big enough? */
565     char new_queue_path[256];
566     char old_queue_path[256];
567     apr_file_t *queue_fd = NULL;
568     apr_hash_index_t *hi = NULL;
569     char msg[256];
570     int i;
571     apr_pool_t *cpool;
572
573     apr_snprintf(new_queue_path, sizeof(new_queue_path), "%s.new", queue_path);
574     apr_snprintf(old_queue_path, sizeof(old_queue_path), "%s.old", queue_path);
575     apr_snprintf(msg, sizeof(msg), "%u\n", (unsigned int)apr_time_sec(apr_time_now()));
576
577     if (! have_read_data) {
578         error_log(LOG_DEBUG, NULL, "Checkpoint not required.");
579         return;
580     }
581
582     /* Put a lock in place to ensure exclusivity. */
583     error_log(LOG_DEBUG, NULL, "Checkpoint locking global mutex.");
584     if (APR_STATUS_IS_EBUSY(apr_global_mutex_trylock(gmutex))) {
585         error_log(LOG_DEBUG, NULL, "Checkpoint waiting on global mutex.");
586         apr_global_mutex_lock(gmutex);
587     }
588
589     error_log(LOG_DEBUG, NULL, "Checkpoint started.");
590
591     apr_pool_create(&cpool, NULL);
592
593     /* Dump active entries into a new queue file. */
594     if (apr_file_open(&queue_fd, new_queue_path, APR_WRITE | APR_CREATE
595         | APR_EXCL | APR_TRUNCATE | APR_FILE_NOCLEANUP, APR_OS_DEFAULT, cpool) != APR_SUCCESS)
596     {
597         error_log(LOG_ERROR, NULL, "Failed to create file: %s", new_queue_path);
598         error_log(LOG_DEBUG, NULL, "Checkpoint unlocking global mutex.");
599         apr_pool_destroy(cpool);
600         apr_global_mutex_unlock(gmutex);
601         return;
602     }
603
604     /* Write the time first. */
605     apr_file_write_full(queue_fd, msg, strlen(msg), NULL);
606
607     /* Dump the entries sitting in the queue first. */
608     for (i = 0; i < queue->nelts; i++) {
609         entry_t *entry = ((entry_t **)queue->elts)[i];
610         apr_file_write_full(queue_fd, entry->line, entry->line_size, NULL);
611         apr_file_write_full(queue_fd, &"\n", 1, NULL);
612     }
613     error_log(LOG_DEBUG2, NULL, "Checkpoint wrote %d queued entries to new queue.", i);
614
615     /* Then dump the ones that are currently being processed. */
616     i = 0;
617     for (hi = apr_hash_first(NULL, in_progress); hi != NULL; hi = apr_hash_next(hi)) {
618         void *e;
619         entry_t *entry = NULL;
620
621         i++;
622         apr_hash_this(hi, NULL, NULL, &e);
623         entry = e; /* quiet type-punned warning */
624         apr_file_write_full(queue_fd, entry->line, entry->line_size, NULL);
625         apr_file_write_full(queue_fd, &"\n", 1, NULL);
626     }
627     error_log(LOG_DEBUG2, NULL, "Checkpoint wrote %d additional entries to new queue.", i);
628
629     apr_file_close(queue_fd);
630
631     /* Switch the files and truncate the transaction log file. */
632     apr_file_remove(old_queue_path, cpool);
633     apr_file_rename(queue_path, old_queue_path, cpool);
634     apr_file_rename(new_queue_path, queue_path, cpool);
635     apr_file_remove(old_queue_path, cpool);
636     apr_file_trunc(transaction_log_fd, 0);
637
638     error_log(LOG_DEBUG, NULL, "Checkpoint completed.");
639
640     apr_pool_destroy(cpool);
641
642     /* Unlock and exit. */
643     error_log(LOG_DEBUG, NULL, "Checkpoint unlocking global mutex.");
644     apr_global_mutex_unlock(gmutex);
645 }
646
647
648 /**
649  * Parse one confguration line and add it to the
650  * configuration table.
651  */
652 static void parse_configuration_line(const char *line, int line_count)
653 {
654     char *start = NULL, *command = NULL;
655     char *p = NULL;
656
657     /* Remove the trailing newline character. */
658     p = (char *)line;
659     while(*p != '\0') p++;
660     if ((p > start)&&(*(p - 1) == '\n')) *(p - 1) = '\0';
661
662     p = (char *)line;
663     /* Ignore whitespace at the beginning of the line. */
664     while(apr_isspace(*p)) p++;
665
666     /* Ignore empty lines and comments. */
667     if ((*p == '\0')||(*p == '#')) return;
668
669     start = p;
670     while(!apr_isspace(*p)&&(*p != '\0')) p++;
671
672     command = apr_pstrmemdup(pool, start, p - start);
673
674     while(apr_isspace(*p)) p++;
675
676     /* Remove whitespace at the end. */
677     start = p;
678     while(*p != '\0') p++;
679     if (p > start) {
680         p--;
681         while(apr_isspace(*p)) {
682             *p-- = '\0';
683         }
684     }
685
686     /* Remove quotes, but only if we have matching */
687     if ((*start == '"') && (p > start) && (*p == '"')) {
688         start++;
689         *p-- = '\0';
690     }
691
692     /* Take the last directive */
693     /* ENH: Error on dup directives? */
694     apr_table_set(conf, command, start);
695 }
696
697
698 /**
699  * Reads configuration from a file.
700  */
701 static void read_configuration(void)
702 {
703     char linebuf[4096];
704     apr_status_t rc;
705     apr_file_t *fd;
706     int line_count;
707
708     conf = apr_table_make(pool, 32);
709     if (conf == NULL) {
710         error_log(LOG_ERROR, NULL, MEMALLOC_ERROR_MSG);
711         logc_shutdown(1);
712     }
713
714     rc = apr_file_open(&fd, conffile, APR_READ | APR_FILE_NOCLEANUP, 0, pool);
715     if (rc != APR_SUCCESS) {
716         error_log(LOG_ERROR, NULL, "Unable to open configuration file: %s", conffile);
717         logc_shutdown(1);
718     }
719
720     line_count = 0;
721     for(;;) {
722         rc = apr_file_gets(linebuf, 4096, fd);
723         if (rc == APR_EOF) return;
724         if (rc != APR_SUCCESS) {
725             error_log(LOG_ERROR, NULL, "Error reading from the configuration file.");
726             logc_shutdown(1);
727         }
728
729         line_count++;
730         parse_configuration_line(linebuf, line_count);
731     }
732
733     apr_file_close(fd);
734 }
735
736
737 /**
738  * Initialize the configuration.
739  */
740 static void init_configuration(void)
741 {
742     char errstr[1024];
743     apr_status_t rc = 0;
744     const char *s = NULL;
745
746     /* Other values may be based off the collector root. */
747     s = apr_table_get(conf, "CollectorRoot");
748     if (s != NULL) {
749         collector_root = s;
750     }
751
752     /* Error Log */
753     s = apr_table_get(conf, "ErrorLog");
754     if (s != NULL) {
755         error_log_path = file_path(s);
756     }
757
758     s = apr_table_get(conf, "ErrorLogLevel");
759     if (s != NULL) {
760         error_log_level = atoi(s);
761     }
762
763     if ((rc = apr_file_open(&error_log_fd, error_log_path, APR_WRITE | APR_CREATE | APR_APPEND,
764         APR_OS_DEFAULT, pool)) != APR_SUCCESS)
765     {
766         error_log(LOG_ERROR, NULL, "Failed to open the error log %s: %s\n",
767             error_log_path, apr_strerror(rc, errstr, 1024));
768         logc_shutdown(1);
769     }
770
771     error_log(LOG_NOTICE, NULL, "Configuring ModSecurity Audit Log Collector %s.", VERSION);
772
773     /* Startup Delay */
774     s = apr_table_get(conf, "StartupDelay");
775     if (s != NULL) {
776         startup_delay = atoi(s);
777     }
778
779     if ( startup_delay > 0 ) {
780         error_log(LOG_NOTICE, NULL, "Delaying execution for %dms.", startup_delay);
781         apr_sleep(startup_delay * 1000);
782         error_log(LOG_DEBUG, NULL, "Continuing execution after %dms delay.", startup_delay);
783     }
784
785     /* Remaining Configuration */
786
787     error_log(LOG_DEBUG2, NULL, "CollectorRoot=%s", collector_root);
788     error_log(LOG_DEBUG2, NULL, "ErrorLog=%s", error_log_path);
789     error_log(LOG_DEBUG2, NULL, "ErrorLogLevel=%d", error_log_level);
790     error_log(LOG_DEBUG2, NULL, "StartupDelay=%d", startup_delay);
791
792     s = apr_table_get(conf, "CheckpointInterval");
793     if (s != NULL) {
794         checkpoint_interval = atoi(s);
795         error_log(LOG_DEBUG2, NULL, "CheckpointInterval=%d", checkpoint_interval);
796     }
797
798     s = apr_table_get(conf, "QueuePath");
799     if (s != NULL) {
800         queue_path = file_path(s);
801         error_log(LOG_DEBUG2, NULL, "QueuePath=%s", queue_path);
802     }
803     else {
804         error_log(LOG_ERROR, NULL, "QueuePath not defined in the configuration file.");
805         logc_shutdown(1);
806     }
807
808     s = apr_table_get(conf, "LockFile");
809     if (s != NULL) {
810         lockfile = file_path(s);
811         error_log(LOG_DEBUG2, NULL, "LockFile=%s", lockfile);
812     }
813
814     s = apr_table_get(conf, "ServerErrorTimeout");
815     if (s != NULL) {
816         server_error_timeout = atoi(s);
817         error_log(LOG_DEBUG2, NULL, "ServerErrorTimeout=%d", server_error_timeout);
818     }
819
820     s = apr_table_get(conf, "TransactionDelay");
821     if (s != NULL) {
822         transaction_delay = atoi(s);
823         error_log(LOG_DEBUG2, NULL, "TransactionDelay=%d", transaction_delay);
824     }
825
826     s = apr_table_get(conf, "TransactionLog");
827     if (s != NULL) {
828         transaction_log_path = file_path(s);
829         error_log(LOG_DEBUG2, NULL, "TransactionLog=%s", transaction_log_path);
830     }
831
832     s = apr_table_get(conf, "MaxConnections");
833     if (s != NULL) {
834         int v = atoi(s);
835         if (v >= 0) max_connections = v;
836         error_log(LOG_DEBUG2, NULL, "MaxConnections=%d", max_connections);
837     }
838
839     s = apr_table_get(conf, "MaxWorkerRequests");
840     if (s != NULL) {
841         int v = atoi(s);
842         if (v >= 0) max_worker_requests = v;
843         error_log(LOG_DEBUG2, NULL, "MaxWorkerRequests=%d", max_worker_requests);
844     }
845
846     s = apr_table_get(conf, "KeepAlive");
847     if (s != NULL) {
848         int v = atoi(s);
849         if (v >= 0) keep_alive = v;
850         error_log(LOG_DEBUG2, NULL, "KeepAlive=%d", keep_alive);
851     }
852
853     s = apr_table_get(conf, "KeepAliveTimeout");
854     if (s != NULL) {
855         int v = atoi(s);
856         if (v >= 0) keep_alive_timeout = v;
857         error_log(LOG_DEBUG2, NULL, "KeepAliveTimeout=%d", keep_alive_timeout);
858     }
859
860     s = apr_table_get(conf, "LogStorageDir");
861     if (s != NULL) {
862         log_repository = file_path(s);
863         error_log(LOG_DEBUG2, NULL, "LogStorageDir=%s", log_repository);
864     }
865     else {
866         error_log(LOG_ERROR, NULL, "Missing mandatory parameter LogStorageDir.\n");
867         logc_shutdown(1);
868     }
869
870     s = apr_table_get(conf, "ConsoleURI");
871     if (s != NULL) {
872         console_uri = s;
873         error_log(LOG_DEBUG2, NULL, "ConsoleURI=%s", console_uri);
874     }
875     else {
876         error_log(LOG_ERROR, NULL, "Missing mandatory parameter ConsoleURI.\n");
877         logc_shutdown(1);
878     }
879
880     s = apr_table_get(conf, "SensorUsername");
881     if (s != NULL) {
882         sensor_username = s;
883         error_log(LOG_DEBUG2, NULL, "SensorUsername=%s", sensor_username);
884     }
885     else {
886         error_log(LOG_ERROR, NULL, "Missing mandatory parameter SensorUsername.\n");
887         logc_shutdown(1);
888     }
889
890     s = apr_table_get(conf, "SensorPassword");
891     if (s != NULL) {
892         sensor_password = s;
893         error_log(LOG_DEBUG2, NULL, "SensorPassword=%s", sensor_password);
894     }
895     else {
896         error_log(LOG_ERROR, NULL, "Missing mandatory parameter SensorPassword.\n");
897         logc_shutdown(1);
898     }
899
900     s = apr_table_get(conf, "KeepEntries");
901     if (s != NULL) {
902         keep_entries = atoi(s);
903     }
904     else {
905         keep_entries = 0;
906     }
907     error_log(LOG_DEBUG2, NULL, "KeepEntries=%d", keep_entries);
908 }
909
910
911 /**
912  * Clean-up resources before process shutdown.
913  */
914 static void logc_cleanup(void)
915 {
916     curl_global_cleanup();
917 }
918
919
920 /**
921  * Shutdown the logger.
922  */
923 static void logc_shutdown(int rc)
924 {
925     /* Tell the threads to shut down. */
926     running = 0;
927
928     error_log(LOG_DEBUG, NULL, "Shutting down");
929
930     /* Wait for the management thread to stop */
931     /* ENH: Need a fixed timeout if this never happens */
932     while(management_thread_active != 0) {
933         apr_sleep(10 * 1000);
934     }
935
936     if (rc == 0) {
937         error_log(LOG_NOTICE, NULL, "ModSecurity Audit Log Collector %s terminating normally.", VERSION);
938     }
939     else {
940         error_log(LOG_NOTICE, NULL, "ModSecurity Audit Log Collector %s terminating with error %d", VERSION, rc);
941     }
942
943     if (error_log_fd != NULL) {
944         apr_file_flush(error_log_fd);
945     }
946
947     exit(rc);
948 }
949
950
951 /**
952  * Handle signals.
953  */
954 static int handle_signals(int signum)
955 {
956     switch (signum) {
957     case SIGINT:
958         error_log(LOG_NOTICE, NULL, "Caught SIGINT, shutting down.");
959         logc_shutdown(0);
960     case SIGTERM:
961         error_log(LOG_NOTICE, NULL, "Caught SIGTERM, shutting down.");
962         logc_shutdown(0);
963 #ifndef WIN32
964     case SIGHUP:
965         error_log(LOG_NOTICE, NULL, "Caught SIGHUP, ignored.");
966         /* ENH: reload config? */
967         return 0;
968     case SIGALRM:
969         error_log(LOG_DEBUG, NULL, "Caught SIGALRM, ignored.");
970         return 0;
971     case SIGTSTP:
972         error_log(LOG_DEBUG, NULL, "Caught SIGTSTP, ignored.");
973         return 0;
974 #endif /* WIN32 */
975     }
976 #ifndef WIN32
977     error_log(LOG_NOTICE, NULL, "Caught unexpected signal %d: %s", signum, apr_signal_description_get(signum));
978 #else
979     error_log(LOG_NOTICE, NULL, "Caught unexpected signal %d", signum);
980 #endif /* WIN32 */
981     logc_shutdown(1);
982
983     return 0; /* should never reach */
984 }
985
986
987 /**
988  * This function is invoked by Curl to read the response
989  * body. Since we don't care about the response body the function
990  * pretends it is retrieving data where it isn't.
991  */
992 size_t curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream)
993 {
994     unsigned char *data = (unsigned char *)ptr;
995     unsigned char *status = (unsigned char *)stream;
996
997     /* Grab the status line text from the first line of output */
998     if ((status[0] == 0) && (status[1] == 1)) {
999         apr_size_t i, j;
1000         int ismsg = 0;
1001
1002         status[1] = 0; /* reset hidden init flag */
1003
1004         for (i = 0, j = 0; i < STATUSBUF_SIZE; i++) {
1005             /* We found a line ending so we are done */
1006             if ( data[i] == '\r' ) {
1007                 break;
1008             }
1009             /* Skip to after the first space (where msg is) */
1010             if (ismsg < 3) {
1011                 if ((ismsg == 1) && !isspace(data[i])) {
1012                     ismsg++;
1013                 }
1014                 else if (isspace(data[i])) {
1015                     ismsg++;
1016                 }
1017                 continue;
1018             }
1019
1020             /* Copy data (msg) from data to status */
1021             status[j++] = data[i];
1022         }
1023         status[j] = '\0';
1024         urldecode_inplace(status, j);
1025     }
1026
1027     /* do nothing */
1028     return (size * nmemb);
1029 }
1030
1031
1032 /**
1033  * This function is invoked by Curl whenever it has something
1034  * to say. We forward its messages to the error log at level
1035  * DEBUG.
1036  */
1037 int curl_debugfunction(CURL *curl, curl_infotype infotype, char *data, size_t datalen, void *ourdata)
1038 {
1039     apr_size_t i, effectivelen;
1040     apr_thread_t *thread = (apr_thread_t *)ourdata;
1041
1042     if (error_log_level < LOG_DEBUG) return 0;
1043
1044     effectivelen = datalen;
1045     for(i = 0; i < datalen; i++) {
1046         if ((data[i] == 0x0a)||(data[i] == 0x0d)) {
1047             effectivelen = i;
1048             break;
1049         }
1050     }
1051
1052     if (error_log_level >= LOG_DEBUG) {
1053         if (infotype == CURLINFO_TEXT) {
1054             error_log(LOG_DEBUG, thread, "CURL: %s", _log_escape(apr_thread_pool_get(thread), data, effectivelen));
1055         }
1056     }
1057     if (error_log_level >= LOG_DEBUG2) {
1058         if (infotype == CURLINFO_HEADER_IN) {
1059             error_log(LOG_DEBUG2, thread, "CURL: HEADER_IN %s", _log_escape(apr_thread_pool_get(thread), data, effectivelen));
1060         }
1061         else if (infotype == CURLINFO_HEADER_OUT) {
1062             error_log(LOG_DEBUG2, thread, "CURL: HEADER_OUT %s", _log_escape(apr_thread_pool_get(thread), data, effectivelen));
1063         }
1064         else if (infotype == CURLINFO_DATA_IN) {
1065             error_log(LOG_DEBUG2, thread, "CURL: DATA_IN %s", _log_escape(apr_thread_pool_get(thread), data, effectivelen));
1066         }
1067         else if (infotype == CURLINFO_DATA_OUT) {
1068             error_log(LOG_DEBUG2, thread, "CURL: DATA_OUT %s", _log_escape(apr_thread_pool_get(thread), data, effectivelen));
1069         }
1070     }
1071
1072     return 0;
1073 }
1074
1075
1076 /**
1077  * Initialise the necessary resources and structures.
1078  */
1079 static void logc_init(void)
1080 {
1081     char errstr[1024];
1082     apr_status_t rc = 0;
1083     const char *errptr = NULL;
1084     int i, erroffset;
1085
1086     queue = apr_array_make(pool, 64, sizeof(entry_t *));
1087     if (queue == NULL) {
1088         error_log(LOG_ERROR, NULL, MEMALLOC_ERROR_MSG);
1089         logc_shutdown(1);
1090     }
1091
1092     in_progress = apr_hash_make(pool);
1093     if (in_progress == NULL) {
1094         error_log(LOG_ERROR, NULL, MEMALLOC_ERROR_MSG);
1095         logc_shutdown(1);
1096     }
1097
1098     if ((rc = apr_global_mutex_create(&gmutex, lockfile, APR_LOCK_DEFAULT, pool)) != APR_SUCCESS) {
1099         error_log(LOG_ERROR, NULL, "Failed to create global mutex: %s",
1100             apr_strerror(rc, errstr, 1024));
1101         logc_shutdown(1);
1102     }
1103
1104     if ((rc = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_UNNESTED, pool)) != APR_SUCCESS) {
1105         error_log(LOG_ERROR, NULL, "Failed to create thread mutex: %s",
1106             apr_strerror(rc, errstr, 1024));
1107         logc_shutdown(1);
1108     }
1109
1110     entry_counter = 1;
1111
1112     curl_handles = apr_array_make(pool, max_connections, sizeof(CURL *));
1113     if (curl_handles == NULL) {
1114         error_log(LOG_ERROR, NULL, MEMALLOC_ERROR_MSG);
1115         logc_shutdown(1);
1116     }
1117
1118     /* Initialise a number of Curl handles. */
1119     for(i = 0; i < max_connections; i++) {
1120         CURL *curl = NULL;
1121
1122         /* Create cURL handle. */
1123         curl = curl_easy_init();
1124
1125         /* Pre-configure the handle. */
1126         curl_easy_setopt(curl, CURLOPT_UPLOAD, TRUE);
1127         curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, (char *)NULL);
1128         curl_easy_setopt(curl, CURLOPT_URL, console_uri);
1129         curl_easy_setopt(curl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
1130         curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, FALSE);
1131         curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0);
1132         /* SSLv3 works better overall as some servers have issues with TLS */
1133         curl_easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_SSLv3);
1134         curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 15);
1135         curl_easy_setopt(curl, CURLOPT_NOSIGNAL, TRUE);
1136         curl_easy_setopt(curl, CURLOPT_HEADER, TRUE);
1137
1138         curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_writefunction);
1139
1140         *(CURL **)apr_array_push(curl_handles) = curl;
1141     }
1142
1143     logline_regex = pcre_compile(logline_pattern, PCRE_CASELESS, &errptr, &erroffset, NULL);
1144     if (logline_regex == NULL) {
1145         error_log(LOG_ERROR, NULL, "Failed to compile pattern: %s\n", logline_pattern);
1146         logc_shutdown(1);
1147     }
1148
1149     requestline_regex = pcre_compile(requestline_pattern, PCRE_CASELESS, &errptr, &erroffset, NULL);
1150     if (requestline_regex == NULL) {
1151         error_log(LOG_ERROR, NULL, "Failed to compile pattern: %s\n", requestline_pattern);
1152         logc_shutdown(1);
1153     }
1154 }
1155
1156
1157 /**
1158  * HACK: To allow two mlogcs running against a single dataset we use the
1159  * mtime as a flag for deletion.
1160  *
1161  *  1) Check file date.
1162  *  2) If it is KEEP_ENTRIES_REMOVE_TIME, then remove the file.
1163  *  3) Otherwise set the date and let the other mlogc remove it.
1164  */
1165 static void keep_entries_hack(apr_pool_t *mp, apr_thread_t *thread, const char *fn)
1166 {
1167     apr_file_t *f = NULL;
1168     apr_finfo_t finfo;
1169     char errstr[1024];
1170     apr_status_t rc;
1171
1172     /* Opening for write as required for exclusive lock */
1173     if ((rc = apr_file_open(&f, fn, APR_READ|APR_WRITE|APR_APPEND, APR_OS_DEFAULT, mp)) != APR_SUCCESS) {
1174         error_log(LOG_ERROR, thread, "Could not open \"%s\": %s", fn, apr_strerror(rc, errstr, 1024));
1175         return;
1176     }
1177
1178     if ((rc = apr_file_lock(f, APR_FLOCK_EXCLUSIVE|APR_FLOCK_NONBLOCK)) != APR_SUCCESS) {
1179         error_log(LOG_DEBUG2, thread, "Waiting for lock on \"%s\": %s", fn, apr_strerror(rc, errstr, 1024));
1180         if ((rc = apr_file_lock(f, APR_FLOCK_EXCLUSIVE)) != APR_SUCCESS) {
1181             error_log(LOG_ERROR, thread, "Could not lock \"%s\": %s", fn, apr_strerror(rc, errstr, 1024));
1182             apr_file_close(f);
1183             return;
1184         }
1185     }
1186     error_log(LOG_DEBUG2, thread, "Locked: %s", fn);
1187
1188     /* For testing only */
1189     TEST_WITH_RAND_SLEEP(2);
1190
1191     if ((rc = apr_stat(&finfo, fn, APR_FINFO_MIN, mp)) != APR_SUCCESS) {
1192         error_log(LOG_ERROR, thread, "Could not stat \"%s\": %s", fn, apr_strerror(rc, errstr, 1024));
1193         error_log(LOG_DEBUG2, thread, "Unlocked: %s", fn);
1194         apr_file_close(f);
1195         return;
1196     }
1197
1198     if (error_log_level >= LOG_DEBUG) {
1199         error_log(LOG_DEBUG, thread, "STAT \"%s\" {uid=%d; gid=%d; size=%" APR_OFF_T_FMT "; csize=%" APR_OFF_T_FMT "; atime=%" APR_TIME_T_FMT "; ctime=%" APR_TIME_T_FMT "; mtime=%" APR_TIME_T_FMT "}", fn, finfo.user, finfo.group, finfo.size, finfo.csize, finfo.atime, finfo.ctime, finfo.mtime);
1200     }
1201
1202     if (finfo.mtime != KEEP_ENTRIES_REMOVE_TIME) {
1203         error_log(LOG_DEBUG2, thread, "Set mtime: %s", fn);
1204         if ((rc = apr_file_mtime_set(fn, (apr_time_t)KEEP_ENTRIES_REMOVE_TIME, mp)) != APR_SUCCESS) {
1205             error_log(LOG_ERROR, thread, "Could not set mtime on \"%s\": %s", fn, apr_strerror(rc, errstr, 1024));
1206         }
1207         error_log(LOG_DEBUG2, thread, "Unlocked: %s", fn);
1208         apr_file_close(f);
1209         return;
1210     }
1211
1212
1213     error_log(LOG_DEBUG, thread, "Removing: %s", fn);
1214     error_log(LOG_DEBUG2, thread, "Unlocked: %s", fn);
1215     apr_file_close(f);
1216     apr_file_remove(fn, mp);
1217 }
1218
1219
1220 /**
1221  * Worker thread. Works in a loop, fetching jobs from the queue,
1222  * until the queue is empty or it is otherwise told to quit.
1223  */
1224 static void * APR_THREAD_FUNC thread_worker(apr_thread_t *thread, void *data)
1225 {
1226     unsigned int loop_count = 0;
1227     CURL *curl = (CURL *)data;
1228     entry_t **entryptr = NULL;
1229     entry_t *entry = NULL;
1230     apr_status_t rc;
1231     apr_finfo_t finfo;
1232     int capturevector[CAPTUREVECTORSIZE];
1233     int take_new = 1;
1234     apr_pool_t *tpool;
1235     struct curl_slist *headerlist = NULL;
1236     char curl_error_buffer[CURL_ERROR_SIZE] = "";
1237     int num_requests = 0;
1238
1239     /* There is no need to do the sleep if this was an invalid entry
1240      * as the sleep is just to protect flooding the console server
1241      * with rapid requests.  With an invalid entry we never hit the
1242      * server, so we should not delay processing the next event.
1243      */
1244     int nodelay = 0;
1245
1246
1247     error_log(LOG_DEBUG, thread, "Worker thread starting.");
1248
1249     /* Each worker uses its own pool to manage memory. To avoid
1250      * memory leaks the pool is cleared after each processed
1251      * entry.
1252      */
1253     apr_pool_create(&tpool, thread_pool);
1254
1255     /* Process jobs in a queue until there are no more jobs to process. */
1256     for(;;) {
1257         nodelay = 0;
1258
1259         /* Do we need to shut down? */
1260         if (running == 0) {
1261             error_log(LOG_DEBUG, thread, "We were told to shut down.");
1262             goto THREAD_SHUTDOWN;
1263         }
1264
1265         /* Is there a problem with the server? We need
1266          * to shut down if there is. Except that we don't
1267          * want to shut down if we were launched to investigate
1268          * if the server came back online (loop_count will be
1269          * zero in that case).
1270          */
1271         if ((server_error == 1)&&(loop_count != 0)) {
1272             error_log(LOG_DEBUG, thread, "Shutting down due to server error.");
1273             goto THREAD_SHUTDOWN;
1274         }
1275
1276         loop_count++;
1277
1278         /* Get a new entry, but only if we need one. */
1279         if (take_new) {
1280             error_log(LOG_DEBUG, thread, "Worker fetch locking thread mutex.");
1281             if (APR_STATUS_IS_EBUSY(apr_thread_mutex_trylock(mutex))) {
1282                 error_log(LOG_DEBUG, thread, "Worker fetch waiting on thread mutex.");
1283                 apr_thread_mutex_lock(mutex);
1284             }
1285
1286             error_log(LOG_DEBUG, thread, "Worker fetch started.");
1287
1288             /* Deal with the previous entry. */
1289             if (entry != NULL) {
1290                 error_log(LOG_DEBUG, thread, "Removing previous entry from storage.");
1291                 transaction_log(TXOUT, entry->line);
1292
1293                 /* Remove previous entry from storage. */
1294                 apr_hash_set(in_progress, &entry->id, sizeof(entry->id), NULL);
1295
1296                 /* Release the memory it used to occupy. */
1297                 free((void *)entry->line);
1298                 free(entry);
1299                 entry = NULL;
1300             }
1301
1302             error_log(LOG_DEBUG, thread, "Getting one entry from the queue.");
1303
1304             /* Get one entry. */
1305             entryptr = (entry_t **)apr_array_pop(queue);
1306             if (entryptr == NULL) {
1307                 error_log(LOG_DEBUG, thread, "Worker fetch unlocking thread mutex.");
1308                 apr_thread_mutex_unlock(mutex);
1309                 error_log(LOG_DEBUG, thread, "No more work for this thread, exiting.");
1310
1311                 goto THREAD_SHUTDOWN;
1312             }
1313
1314             entry = *entryptr;
1315             apr_hash_set(in_progress, &entry->id, sizeof(entry->id), entry);
1316
1317             error_log(LOG_DEBUG, thread, "Worker fetch completed.");
1318
1319             error_log(LOG_DEBUG, thread, "Worker fetch unlocking thread mutex.");
1320             apr_thread_mutex_unlock(mutex);
1321         }
1322
1323         /* Send one entry. */
1324
1325         error_log(LOG_DEBUG, thread, "Processing entry.");
1326         take_new = 0;
1327
1328         /* Keep track of requests processed if we need to */
1329         if (max_worker_requests > 0) {
1330             num_requests++;
1331         }
1332
1333         rc = pcre_exec(logline_regex, NULL, entry->line, entry->line_size, 0, 0,
1334             capturevector, CAPTUREVECTORSIZE);
1335         if (rc == PCRE_ERROR_NOMATCH) { /* No match. */
1336             error_log(LOG_WARNING, thread, "Invalid entry (failed to match regex): %s", _log_escape(tpool, entry->line, entry->line_size));
1337             take_new = 1;
1338             nodelay = 1;
1339         }
1340         else if (rc < 0) { /* Error condition. */
1341             error_log(LOG_WARNING, thread, "Invalid entry (PCRE error %d): %s", rc, _log_escape(tpool, entry->line, entry->line_size));
1342             take_new = 1;
1343             nodelay = 1;
1344         }
1345         else { /* We have a match. */
1346             char *uniqueid = NULL;
1347             char *auditlogentry = NULL;
1348             char *hash = NULL;
1349             char *summary = NULL;
1350             char *credentials = NULL;
1351
1352             error_log(LOG_DEBUG, thread, "Regular expression matched.");
1353
1354             /* For testing only */
1355             TEST_WITH_RAND_SLEEP(2);
1356
1357             uniqueid = apr_psprintf(tpool, "%.*s",
1358                 (capturevector[2*13+1] - capturevector[2*13]), (entry->line + capturevector[2*13]));
1359             auditlogentry = apr_psprintf(tpool, "%s/%.*s", log_repository,
1360                 (capturevector[2*15+1] - capturevector[2*15]), (entry->line + capturevector[2*15]));
1361             hash = apr_psprintf(tpool, "X-Content-Hash: %.*s",
1362                 (capturevector[2*18+1] - capturevector[2*15]), (entry->line + capturevector[2*18]));
1363             summary = apr_psprintf(tpool, "X-ForensicLog-Summary: %s", entry->line);
1364             credentials = apr_psprintf(tpool, "%s:%s", sensor_username, sensor_password);
1365
1366             rc = apr_stat(&finfo, auditlogentry, APR_FINFO_SIZE, tpool);
1367             if (rc == APR_SUCCESS) {
1368                 FILE *hd_src;
1369                 char response_buf[STATUSBUF_SIZE];
1370                 CURLcode res;
1371
1372                 if (error_log_level >= LOG_DEBUG) {
1373                     error_log(LOG_DEBUG, thread, "STAT \"%s\" {uid=%d; gid=%d; size=%" APR_OFF_T_FMT "; csize=%" APR_OFF_T_FMT "; atime=%" APR_TIME_T_FMT "; ctime=%" APR_TIME_T_FMT "; mtime=%" APR_TIME_T_FMT "}", auditlogentry, finfo.user, finfo.group, finfo.size, finfo.csize, finfo.atime, finfo.ctime, finfo.mtime);
1374                 }
1375
1376                 /* Initialize the respone buffer with a hidden value */
1377                 response_buf[0] = 0;
1378                 response_buf[1] = 1;
1379
1380                 if (finfo.size == 0) {
1381                     error_log(LOG_WARNING, thread, "File found (%" APR_SIZE_T_FMT " bytes), skipping.", finfo.size);
1382                     take_new = 1;
1383                     nodelay = 1;
1384                     goto THREAD_CLEANUP;
1385                 }
1386                 else {
1387                     error_log(LOG_DEBUG, thread, "File found (%" APR_SIZE_T_FMT " bytes), activating cURL.", finfo.size);
1388                 }
1389
1390
1391                 curl_easy_setopt(curl, CURLOPT_VERBOSE, 1);
1392                 curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, curl_debugfunction);
1393                 curl_easy_setopt(curl, CURLOPT_DEBUGDATA, thread);
1394                 curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, curl_error_buffer);
1395                 curl_easy_setopt(curl, CURLOPT_USERPWD, credentials);
1396                 curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)response_buf);
1397
1398                 headerlist = curl_slist_append(headerlist, "Expect:");
1399                 headerlist = curl_slist_append(headerlist, hash);
1400                 headerlist = curl_slist_append(headerlist, summary);
1401                 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headerlist);
1402
1403                 hd_src = fopen(auditlogentry, "rb");
1404                 if (hd_src == NULL) {
1405                     error_log(LOG_WARNING, thread, "Invalid entry (failed to open file for reading): %s", auditlogentry);
1406                     take_new = 1;
1407                     nodelay = 1;
1408                     goto THREAD_CLEANUP;
1409                 }
1410
1411                 curl_easy_setopt(curl, CURLOPT_READDATA, hd_src);
1412                 curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, finfo.size);
1413 #if 0
1414                 mandatory on win32?
1415                 curl_easy_setopt(curl, CURLOPT_READFUNCTION, read_callback);
1416 #endif
1417
1418                 res = curl_easy_perform(curl);
1419
1420                 fclose(hd_src);
1421
1422                 if (res == 0) {
1423                     long response_code = 0;
1424
1425                     res = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
1426                     error_log(LOG_DEBUG, thread, "Request returned with status \"%ld %s\": %s", response_code, response_buf, uniqueid);
1427
1428
1429                     if (response_code == 0) {
1430                         /* Assume problem with connection */
1431                         error_log(LOG_WARNING, thread, "Flagging server as errored after failure to retrieve response code for entry %s (cURL code %d): Possible SSL negotiation error",
1432                             uniqueid, res);
1433                         apr_sleep(1000 * 1000);
1434                         take_new = 0;
1435                         server_error = 1;
1436                         server_error_last_check_time = apr_time_now();
1437                     }
1438                     else if (res != 0) {
1439                         error_log(LOG_WARNING, thread, "Flagging server as errored after failure to retrieve response code for entry %s (cURL code %d): %s",
1440                             uniqueid, res, curl_error_buffer);
1441                         apr_sleep(1000 * 1000);
1442                         take_new = 0;
1443                         server_error = 1;
1444                         server_error_last_check_time = apr_time_now();
1445                     }
1446                     else {
1447                         if (response_code == 200) {
1448                             double total_time, upload_size;
1449
1450                             if (server_error == 1) {
1451                                 error_log(LOG_NOTICE, thread, "Clearing the server error flag after successful entry submission: %s", uniqueid);
1452                             }
1453                             server_error = 0;
1454                             server_error_last_check_time = 0;
1455
1456                             curl_easy_getinfo(curl, CURLINFO_TOTAL_TIME, &total_time);
1457                             curl_easy_getinfo(curl, CURLINFO_SIZE_UPLOAD, &upload_size);
1458
1459                             if (!keep_entries) {
1460                                 error_log(LOG_DEBUG, thread, "Removing: %s", auditlogentry);
1461                                 apr_file_remove(auditlogentry, tpool);
1462                             }
1463                             else if (keep_entries == KEEP_ENTRIES_REMOVE_HACK) {
1464                                 keep_entries_hack(tpool, thread, auditlogentry);
1465                             }
1466
1467                             error_log(LOG_NOTICE, thread, "Entry completed (%.3f seconds, %.0f bytes): %s",
1468                                 total_time, upload_size,
1469                                 uniqueid);
1470                             take_new = 1;
1471                         }
1472                         else if (response_code == 409) {
1473                             /* Assume problem with audit log entry. */
1474                             error_log(LOG_WARNING, thread, "Failed to submit entry with \"409 %s\": %s",
1475                                 response_buf, uniqueid);
1476                             take_new = 1;
1477                         }
1478                         else {
1479                             /* Assume problem with server. */
1480                             error_log(LOG_WARNING, thread, "Flagging server as errored after failure to submit entry %s with HTTP response code %ld: %s",
1481                                 uniqueid, response_code, response_buf);
1482                             server_error = 1;
1483                             server_error_last_check_time = apr_time_now();
1484                             take_new = 0;
1485                         }
1486                     }
1487                 }
1488                 else { /* Something isn't right. */
1489                     error_log(LOG_WARNING, thread, "Flagging server as errored after failure to submit entry %s (cURL code %d): %s", uniqueid, res, curl_error_buffer);
1490                     server_error = 1;
1491                     server_error_last_check_time = apr_time_now();
1492                     take_new = 0;
1493
1494                 }
1495             }
1496             else {
1497                 error_log(LOG_WARNING, thread, "Invalid entry (file not found %d): %s", rc, auditlogentry);
1498                 take_new = 1;
1499                 nodelay = 1;
1500             }
1501
1502             /* If we are tracking num_requests, then shutdown if we are
1503              * over our threshold.
1504              */
1505             if (num_requests && (num_requests >= max_worker_requests)) {
1506                 error_log(LOG_NOTICE, thread, "Reached max requests (%d) for this worker, exiting.", max_worker_requests);
1507
1508                 goto THREAD_SHUTDOWN;
1509             }
1510         }
1511
1512         THREAD_CLEANUP:
1513
1514         /* Sleep if we sent data to the server so we do not flood */
1515         /* ENH: Need to sleep for 1ms in a loop checking for shutdown */
1516         if ((nodelay == 0) && (transaction_delay > 0)) {
1517             error_log(LOG_DEBUG, thread, "Sleeping for %d msec.", transaction_delay);
1518             apr_sleep(transaction_delay * 1000);
1519         }
1520
1521         if (headerlist != NULL) {
1522             curl_slist_free_all(headerlist);
1523             headerlist = NULL;
1524         }
1525
1526         apr_pool_clear(tpool);
1527
1528         error_log(LOG_DEBUG, thread, "Worker processing completed.");
1529     }
1530
1531     THREAD_SHUTDOWN:
1532
1533     error_log(LOG_DEBUG, thread, "Worker shutdown locking thread mutex.");
1534     if (APR_STATUS_IS_EBUSY(apr_thread_mutex_trylock(mutex))) {
1535         error_log(LOG_DEBUG, thread, "Worker shutdown waiting on thread mutex.");
1536         apr_thread_mutex_lock(mutex);
1537     }
1538
1539     /* Deal with the previous entry, if any. */
1540     if (entry != NULL) {
1541         apr_hash_set(in_progress, &entry->id, sizeof(entry->id), NULL);
1542
1543         if (take_new == 0) { /* Not done. */
1544             *(entry_t **)apr_array_push(queue) = entry;
1545         }
1546         else {
1547             transaction_log(TXOUT, entry->line);
1548             free((void *)entry->line);
1549             free(entry);
1550         }
1551
1552         entry = NULL;
1553     }
1554
1555     /* Return curl handle to the pool for reuse. */
1556     *(CURL **)apr_array_push(curl_handles) = curl;
1557
1558     /* No more work, exit. */
1559     current_workers--;
1560
1561     error_log(LOG_DEBUG, thread, "Worker shutdown unlocking thread mutex.");
1562     apr_thread_mutex_unlock(mutex);
1563
1564     apr_pool_destroy(tpool);
1565
1566     error_log(LOG_DEBUG, thread, "Worker thread completed.");
1567
1568     apr_thread_exit(thread, 0);
1569
1570     return NULL;
1571 }
1572
1573
1574 /**
1575  * Creates one new worker, giving it one of the available
1576  * Curl handles to work with.
1577  */
1578 static void create_new_worker(int lock)
1579 {
1580     apr_thread_t *thread = NULL;
1581     CURL **curlptr = NULL;
1582
1583     if (lock) {
1584         error_log(LOG_DEBUG, NULL, "Worker creation locking thread mutex.");
1585         if (APR_STATUS_IS_EBUSY(apr_thread_mutex_trylock(mutex))) {
1586             error_log(LOG_DEBUG, NULL, "Worker creation waiting on thread mutex.");
1587             apr_thread_mutex_lock(mutex);
1588         }
1589     }
1590
1591     error_log(LOG_DEBUG, NULL, "Worker creation started.");
1592
1593     /* A sanity check: this part executes under lock and
1594      * we want to make *sure* we don't create more threads
1595      * than we are allowed.
1596      */
1597     if (current_workers >= max_connections) {
1598         if (lock) {
1599             error_log(LOG_DEBUG, NULL, "Worker creation unlocking thread mutex.");
1600             apr_thread_mutex_unlock(mutex);
1601         }
1602         return;
1603     }
1604
1605     /* Cleanup thread pool when idle */
1606     if (current_workers <= 0)  {
1607         if (thread_pool != NULL) {
1608             error_log(LOG_DEBUG, NULL, "Destroying thread_pool.");
1609             apr_pool_destroy(thread_pool);
1610             thread_pool = NULL;
1611         }
1612         error_log(LOG_DEBUG, NULL, "Creating thread_pool.");
1613         apr_pool_create(&thread_pool, NULL);
1614     }
1615
1616     curlptr = (CURL **)apr_array_pop(curl_handles);
1617     if (curlptr != NULL) {
1618         apr_threadattr_t *thread_attrs;
1619         apr_status_t rc;
1620
1621         apr_threadattr_create(&thread_attrs, thread_pool);
1622         apr_threadattr_detach_set(thread_attrs, 1);
1623         apr_threadattr_stacksize_set(thread_attrs, 1024);
1624
1625         rc = apr_thread_create(&thread, thread_attrs, thread_worker, *curlptr, thread_pool);
1626         if (rc != APR_SUCCESS) {
1627             if (lock) {
1628                 error_log(LOG_DEBUG, NULL, "Worker creation unlocking thread mutex.");
1629                 apr_thread_mutex_unlock(mutex);
1630             }
1631             error_log(LOG_ERROR, NULL, "Failed to create new worker thread: %d", rc);
1632             logc_shutdown(1);
1633         }
1634
1635         current_workers++;
1636     }
1637     else {
1638         if (lock) {
1639             error_log(LOG_DEBUG, NULL, "Worker creation unlocking thread mutex.");
1640             apr_thread_mutex_unlock(mutex);
1641         }
1642         error_log(LOG_ERROR, NULL, "No more cURL handles (Internal Error).");
1643         logc_shutdown(1);
1644     }
1645
1646     error_log(LOG_DEBUG, NULL, "Worker creation completed: %pp", thread);
1647
1648     if (lock) {
1649         error_log(LOG_DEBUG, NULL, "Worker creation unlocking thread mutex.");
1650         apr_thread_mutex_unlock(mutex);
1651     }
1652 }
1653
1654
1655 /**
1656  * This function implements the management thread.
1657  */
1658 static void * APR_THREAD_FUNC thread_manager(apr_thread_t *thread, void *data)
1659 {
1660     apr_time_t last = 0;
1661     apr_time_t now = 0;
1662
1663     error_log(LOG_DEBUG, thread, "Management thread: Starting.");
1664
1665     for(;;) {
1666         now = apr_time_now();
1667
1668         /* Should we stop running? */
1669         if (running == 0) {
1670             /* We need to be last */
1671             error_log(LOG_DEBUG, thread, "Management thread: Waiting for worker threads to finish.");
1672             while(current_workers > 0) {
1673                 apr_sleep(10 * 1000);
1674             }
1675
1676             if (have_read_data) {
1677                 error_log(LOG_NOTICE, thread, "Running final transaction checkpoint.");
1678                 transaction_checkpoint();
1679             }
1680
1681             error_log(LOG_DEBUG, thread, "Management thread: Exiting.");
1682             management_thread_active = 0;
1683             apr_thread_exit(thread, 0);
1684         }
1685
1686         /* Sleep for a while, but wake up often to check running status */
1687         if ((last > 0) && ((now - last) < MANAGER_SLEEP)) {
1688             apr_sleep(MANAGER_SUBSLEEP);
1689             continue;
1690         }
1691         last = now;
1692
1693         error_log(LOG_DEBUG2, thread, "Management thread: Processing");
1694
1695         /* When the server is flagged errored we need to
1696          * create a worker thread from time to time to
1697          * investigate.
1698          */
1699         if (server_error) {
1700             if ((current_workers == 0)&&
1701                 (apr_time_sec(now - server_error_last_check_time) > server_error_timeout))
1702             {
1703                 server_error_last_check_time = now;
1704                 error_log(LOG_DEBUG, thread, "Management thread: Creating worker thread to investigate server.");
1705                 create_new_worker(1);
1706             }
1707         }
1708         else {
1709             if ((current_workers < max_connections)&&(queue->nelts > current_workers)) {
1710                 error_log(LOG_DEBUG, thread, "Management thread: Creating worker thread to catch up with the queue.");
1711                 create_new_worker(1);
1712             }
1713         }
1714
1715         /* Initiate a transaction log checkpoint if enough time passed since the last one. */
1716         if (apr_time_sec(now - checkpoint_time_last) > checkpoint_interval) {
1717             error_log(LOG_DEBUG, thread, "Management thread: Initiating a checkpoint"
1718                 " (previous was %" APR_TIME_T_FMT " seconds ago).", apr_time_sec(now - checkpoint_time_last));
1719             checkpoint_time_last = now;
1720             transaction_checkpoint();
1721         }
1722         else {
1723             error_log(LOG_DEBUG2, thread, "Management thread: Last checkpoint was %" APR_TIME_T_FMT " seconds ago.",
1724                 apr_time_sec(now - checkpoint_time_last));
1725         }
1726     }
1727
1728     return NULL;
1729 }
1730
1731 #ifndef WIN32
1732 /**
1733  * Thread to handle all signals
1734  */
1735 static void * APR_THREAD_FUNC thread_signals(apr_thread_t *thread, void *data)
1736 {
1737     apr_status_t rc;
1738
1739     error_log(LOG_DEBUG, thread, "Signal thread: Starting.");
1740     rc = apr_signal_thread(handle_signals);
1741     if (rc != APR_SUCCESS) {
1742         error_log(LOG_DEBUG, thread, "Signal thread: Error %d", rc);
1743         logc_shutdown(1);
1744     }
1745
1746     return NULL;
1747 }
1748 #endif /* WIN32 */
1749
1750 /**
1751  * The main loop where we receive log entries from
1752  * Apache and add them to the queue, sometimes creating
1753  * new worker threads to handle them.
1754  */
1755 static void receive_loop(void) {
1756     apr_file_t *fd_stdin;
1757     apr_size_t nbytes = PIPE_BUF_SIZE;
1758     char *buf = apr_palloc(pool, PIPE_BUF_SIZE + 1);
1759     char errstr[1024];
1760     apr_size_t evnt = 0; /* Index in buf to first event char */
1761     apr_size_t curr = 0; /* Index in buf to current processing char */
1762     apr_size_t next = 0; /* Index in buf to next unused char */
1763     int done = 0;
1764     int drop_next = 0;
1765     int buffered_events = 0;
1766     int count = 0;
1767     apr_pool_t *tmp_pool;
1768
1769     /* Open stdin. */
1770     if (apr_file_open_stdin(&fd_stdin, pool) != APR_SUCCESS) {
1771         error_log(LOG_ERROR, NULL, "Unable to open stdin for reading");
1772         logc_shutdown(1);
1773     }
1774
1775     /* Always want this NUL terminated */
1776     buf[PIPE_BUF_SIZE] = '\0';
1777
1778     apr_pool_create(&tmp_pool, NULL);
1779
1780     /* Loop forever receiving entries from stdin. */
1781     while(!done || (curr < next)) {
1782         apr_status_t rc;
1783
1784         if (error_log_level >= LOG_DEBUG2) {
1785             error_log(LOG_DEBUG2, NULL, "Internal state: [evnt \"%" APR_SIZE_T_FMT "\"][curr \"%" APR_SIZE_T_FMT "\"][next \"%" APR_SIZE_T_FMT "\"][nbytes \"%" APR_SIZE_T_FMT "\"]", evnt, curr, next, nbytes);
1786         }
1787
1788         /* If we are not done and have the space, read more */
1789         if (!done && (nbytes > 0)) {
1790             buffered_events = 0;
1791             nbytes = PIPE_BUF_SIZE - next;
1792             rc = apr_file_read(fd_stdin, (buf + next), &nbytes);
1793             if (rc != APR_SUCCESS) {
1794                 if (have_read_data) {
1795                     error_log(LOG_NOTICE, NULL, "No more data to read, emptying buffer: %s", apr_strerror(rc, errstr, 1024));
1796                 }
1797                 done = 1;
1798             }
1799             else {
1800                 have_read_data = 1;
1801                 if (error_log_level == LOG_DEBUG) {
1802                     error_log(LOG_DEBUG, NULL, "Read %" APR_SIZE_T_FMT " bytes from pipe", nbytes);
1803                 }
1804                 else {
1805                     error_log(LOG_DEBUG2, NULL, "Read %" APR_SIZE_T_FMT " bytes from pipe: `%s'", nbytes, _log_escape(tmp_pool, (buf + next), nbytes));
1806                 }
1807             }
1808
1809             next += nbytes;
1810         }
1811
1812         /**
1813          * Each chunk of data we receive can contain one or more lines for
1814          * which we need to find the EOL marker and then queue the event
1815          * up to that.  So, find/queue as many lines in the buffer as we
1816          * can.  Any remaining data will get shifted back to the beginning
1817          * of the buffer and the buffer size for the next read adjusted.
1818          */
1819         while(curr < next) {
1820             /* Look for EOL so we can parse the event */
1821             while((curr < next) && (buf[curr] != 0x0a)) {
1822                 curr++;
1823             }
1824             if (buf[curr] == 0x0a) {
1825                 buf[curr] = '\0';
1826
1827                 /* We may have to drop this one if it previously failed */
1828                 if (drop_next) {
1829                     error_log(LOG_ERROR, NULL, "Dropping remaining portion of failed event: `%s'", _log_escape(tmp_pool, (buf + evnt), (curr - evnt)));
1830                     drop_next = 0;
1831                 }
1832                 else {
1833                     transaction_log(TXIN, buf + evnt);
1834                     error_log(LOG_DEBUG2, NULL, "Received audit log entry (count %lu queue %d workers %d): %s",
1835                         entry_counter, queue->nelts, current_workers, _log_escape(tmp_pool, (buf + evnt), strlen(buf + evnt)));
1836                     add_entry(buf + evnt, 1);
1837                     buffered_events++;
1838                 }
1839
1840                 /* Advance indexes to next event in buf */
1841                 evnt = curr = curr + 1;
1842             }
1843             else {
1844                 error_log(LOG_DEBUG2, NULL, "Event buffer contains partial event: `%s'", _log_escape(tmp_pool, (buf + evnt), (next - evnt)));
1845                 break;
1846             }
1847         }
1848
1849
1850         if (buffered_events > 0) {
1851             error_log(LOG_DEBUG, NULL, "Processed %d entries from buffer.", buffered_events);
1852
1853             /* Move the unused portion of the buffer to the beginning */
1854             next -= evnt;
1855             curr -= evnt;
1856             memmove(buf, (buf + evnt), next);
1857
1858             error_log(LOG_DEBUG2, NULL, "Shifted buffer back %" APR_SIZE_T_FMT " and offset %" APR_SIZE_T_FMT " bytes for next read: `%s'", evnt, next, _log_escape(tmp_pool, buf, next));
1859
1860             evnt = 0;
1861         }
1862         else if (next == PIPE_BUF_SIZE) {
1863             /**
1864              * There is a chance we could fill the buffer, but not have finished
1865              * reading the event (no EOL yet), so we need to say so and drop
1866              * all data until we find the end of the event that is too large.
1867              */
1868
1869             if (drop_next) {
1870                 error_log(LOG_ERROR, NULL, "Event continuation too large, dropping it as well: `%s'", _log_escape(tmp_pool, buf, PIPE_BUF_SIZE));
1871             }
1872             else {
1873                 error_log(LOG_ERROR, NULL, "Event too large, dropping event: `%s'", _log_escape(tmp_pool, buf, PIPE_BUF_SIZE));
1874             }
1875
1876             /* Rewind buf and mark that we need to drop up to the next event */
1877             evnt = curr = next = 0;
1878             drop_next = 1;
1879         }
1880
1881         nbytes = PIPE_BUF_SIZE - next;
1882
1883         if (count++ > 1000) {
1884             count = 0;
1885             error_log(LOG_DEBUG, NULL, "Recycling tmp_pool.");
1886             apr_pool_destroy(tmp_pool);
1887             apr_pool_create(&tmp_pool, NULL);
1888         }
1889         else {
1890             apr_pool_clear(tmp_pool);
1891         }
1892     }
1893
1894     /* Wait for queue to empty if specified */
1895     if ((server_error == 0) && (opt_force != 0) && (queue->nelts > 0)) {
1896         error_log(LOG_NOTICE, NULL, "Waiting for queue to empty (%d active).", queue->nelts);
1897         while ((server_error == 0) && (opt_force != 0) && (queue->nelts > 0)) {
1898             apr_sleep(10 * 1000);
1899         }
1900         if (queue->nelts > 0) {
1901             error_log(LOG_ERROR, NULL, "Could not empty queue (%d active).", queue->nelts);
1902         }
1903     }
1904 }
1905
1906
1907 /**
1908  * Creates the management thread.
1909  */
1910 static void start_management_thread(void)
1911 {
1912     apr_thread_t *thread = NULL;
1913     apr_threadattr_t *thread_attrs;
1914     apr_status_t rc;
1915
1916     apr_threadattr_create(&thread_attrs, pool);
1917     apr_threadattr_detach_set(thread_attrs, 1);
1918     apr_threadattr_stacksize_set(thread_attrs, 1024);
1919
1920     management_thread_active = 1;
1921
1922     rc = apr_thread_create(&thread, thread_attrs, thread_manager, NULL, pool);
1923     if (rc != APR_SUCCESS) {
1924         error_log(LOG_ERROR, NULL, "Failed to create new management thread: %d", rc);
1925         management_thread_active = 0;
1926         logc_shutdown(1);
1927     }
1928 }
1929 #ifndef WIN32
1930 /**
1931  * Creates a thread to handle all signals
1932  */
1933 static void start_signal_thread(void)
1934 {
1935     apr_thread_t *thread = NULL;
1936     apr_threadattr_t *thread_attrs;
1937     apr_status_t rc;
1938
1939     apr_threadattr_create(&thread_attrs, pool);
1940     apr_threadattr_detach_set(thread_attrs, 1);
1941     apr_threadattr_stacksize_set(thread_attrs, 1024);
1942
1943     rc = apr_thread_create(&thread, thread_attrs, thread_signals, NULL, pool);
1944     if (rc != APR_SUCCESS) {
1945         error_log(LOG_ERROR, NULL, "Failed to create new signal thread: %d", rc);
1946         logc_shutdown(1);
1947     }
1948 }
1949 #endif /* WIN32 */
1950
1951 /**
1952  * Usage text.
1953  */
1954 static void usage(void) {
1955     fprintf(stderr, "ModSecurity Log Collector (mlogc) v%s\n", VERSION);
1956     fprintf(stderr, "  Usage: mlogc [options] /path/to/the/mlogc.conf\n");
1957     fprintf(stderr, "\n");
1958     fprintf(stderr, "  Options:\n");
1959     fprintf(stderr, "    -f        Force depletion of queue on exit\n");
1960     fprintf(stderr, "    -v        Version information\n");
1961     fprintf(stderr, "    -h        This help\n\n");
1962 }
1963
1964 /**
1965  * Version text.
1966  */
1967 static void version(void) {
1968     fprintf(stderr, "ModSecurity Log Collector (mlogc) v%s\n", VERSION);
1969     fprintf(stderr, "   APR: compiled=\"%s\"; loaded=\"%s\"\n", APR_VERSION_STRING, apr_version_string());
1970     fprintf(stderr, "  PCRE: compiled=\"%d.%d\"; loaded=\"%s\"\n", PCRE_MAJOR, PCRE_MINOR, pcre_version());
1971     fprintf(stderr, "  cURL: compiled=\"%s\"; loaded=\"%s\"\n", LIBCURL_VERSION, curl_version());
1972     fprintf(stderr, "\n");
1973 }
1974
1975 /**
1976  * This is the main entry point.
1977  */
1978 int main(int argc, const char * const argv[]) {
1979     apr_getopt_t *opt;
1980     apr_status_t rc;
1981
1982     apr_app_initialize(&argc, &argv, NULL);
1983     atexit(apr_terminate);
1984
1985     curl_global_init(CURL_GLOBAL_ALL);
1986     atexit(logc_cleanup);
1987
1988     logc_pid = getpid();
1989     apr_pool_create(&pool, NULL);
1990     apr_pool_create(&recv_pool, NULL);
1991         
1992 #ifndef WIN32
1993     apr_setup_signal_thread();
1994 #else
1995         apr_signal(SIGINT, handle_signals);
1996         apr_signal(SIGTERM, handle_signals);
1997 #endif /* WIN32 */
1998
1999     if (argc < 2) {
2000         usage();
2001         logc_shutdown(1);
2002     }
2003
2004     /* Commandline opts */
2005     rc = apr_getopt_init(&opt, pool, argc, argv);
2006     if (rc != APR_SUCCESS) {
2007         usage();
2008         logc_shutdown(1);
2009     }
2010
2011     do {
2012         char  ch;
2013         const char *val;
2014         rc = apr_getopt(opt, CMDLINE_OPTS, &ch, &val);
2015         switch (rc) {
2016         case APR_SUCCESS:
2017             switch (ch) {
2018                 case 'f':
2019                     opt_force = 1;
2020                     break;
2021                 case 'v':
2022                     version();
2023                     logc_shutdown(0);
2024                 case 'h':
2025                     usage();
2026                     logc_shutdown(0);
2027             }
2028             break;
2029         case APR_BADCH:
2030         case APR_BADARG:
2031             usage();
2032             logc_shutdown(1);
2033         }
2034     } while (rc != APR_EOF);
2035
2036     /* Conf file is last */
2037     conffile = argv[argc - 1];
2038
2039     read_configuration();
2040     init_configuration();
2041
2042     logc_init();
2043     transaction_log_init();
2044
2045     running = 1;
2046     server_error = 0;
2047
2048     start_management_thread();
2049 #ifndef WIN32
2050     start_signal_thread();
2051 #endif /* WIN32 */
2052
2053     /* Process stdin until EOF */
2054     receive_loop();
2055
2056     logc_shutdown(0);
2057
2058     return 0;
2059 }