2 * ModSecurity for Apache 2.x, http://www.modsecurity.org/
3 * Copyright (c) 2004-2009 Breach Security, Inc. (http://www.breach.com/)
5 * This product is released under the terms of the General Public Licence,
6 * version 2 (GPLv2). Please refer to the file LICENSE (included with this
7 * distribution) which contains the complete text of the licence.
9 * There are special exceptions to the terms and conditions of the GPL
10 * as it is applied to this software. View the full text of the exception in
11 * file MODSECURITY_LICENSING_EXCEPTION in the directory of this software
14 * If any of the files related to licensing are missing or if you have any
15 * other questions related to licensing please contact Breach Security, Inc.
16 * directly using the email address support@breach.com.
21 #include <apr_errno.h>
22 #include <apr_general.h>
23 #include <apr_file_io.h>
24 #include <apr_file_info.h>
27 #include <apr_strings.h>
28 #include <apr_signal.h>
29 #include <apr_thread_proc.h>
30 #include <apr_global_mutex.h>
31 #include <apr_getopt.h>
32 #include <apr_version.h>
34 #include <unistd.h> /* for getpid() */
37 #include <curl/curl.h>
40 #include <sys/types.h>
42 #include "msc_release.h"
44 static void logc_shutdown(int rc);
45 static void create_new_worker(int lock);
46 static void error_log(int level, void *thread, const char *text, ...) PRINTF_ATTRIBUTE(3,4);
51 /* Error log levels. */
58 /* The management thread will wake up every five seconds. */
59 #define MANAGER_SLEEP 5000000
60 #define MANAGER_SUBSLEEP 10000
62 /* Hack to allow multiple mlogc with single delete */
63 #define KEEP_ENTRIES_REMOVE_HACK 2600
64 #define KEEP_ENTRIES_REMOVE_TIME 0l
66 #define TEST_WITH_RAND_SLEEP(n) \
68 int sec = rand()/(RAND_MAX/n); \
69 error_log(LOG_DEBUG2, NULL, "TEST_HACK: Sleeping for %ds", sec); \
70 apr_sleep(apr_time_from_sec(sec)); \
73 #define TEST_WITH_RAND_SLEEP(n)
76 #define CAPTUREVECTORSIZE 60
77 #define PIPE_BUF_SIZE 65536
78 #define MEMALLOC_ERROR_MSG "Memory allocation failed!"
79 #define VERSION MODSEC_VERSION
81 #define CMDLINE_OPTS "fvh"
86 #define STATUSBUF_SIZE 256
88 #define ISHEXCHAR(X) (((X >= '0')&&(X <= '9')) || ((X >= 'a')&&(X <= 'f')) || ((X >= 'A')&&(X <= 'F')))
90 /* -- Regex Patterns -- */
93 * This regular expression is used to parse the entire
94 * log line we receive from Apache. The REQUEST_LINE is
95 * treated as a single parameter to allow for invalid
98 const char logline_pattern[] =
100 "\\ (\\S+)\\ (\\S+)\\ (\\S+)"
101 "\\ \\[([^:]+):(\\d+:\\d+:\\d+)\\ ([^\\]]+)\\]"
104 "\\ \"(.*)\"\\ \"(.*)\""
105 "\\ (\\S+)\\ \"(.*)\""
106 "\\ /?(\\S+)\\ (\\d+)\\ (\\d+)"
112 * This regular expression can be used to parse
113 * a REQUEST_LINE field into method, URI, and
116 const char requestline_pattern[] =
117 "(\\S+)\\ (.*?)\\ (\\S+)";
120 /* -- Structures -- */
123 unsigned long int id;
125 apr_size_t line_size;
129 /* -- Global variables -- */
132 const char *conffile = NULL;
133 const char *lockfile = NULL;
134 int have_read_data = 0;
135 int checkpoint_interval = 60;
136 apr_time_t checkpoint_time_last = 0;
137 const char *collector_root = NULL;
138 apr_table_t *conf = NULL;
139 const char *console_uri = NULL;
140 apr_array_header_t *curl_handles = NULL;
141 int current_workers = 0;
142 int management_thread_active = 0;
143 unsigned long int entry_counter = 1;
144 const char *error_log_path = NULL;
145 apr_file_t *error_log_fd = NULL;
146 int error_log_level = 2;
147 apr_hash_t *in_progress = NULL;
148 int keep_alive = 150; /* Not used yet. */
149 int keep_alive_timeout = 300; /* Not used yet. */
150 int keep_entries = 0;
151 const char *log_repository = NULL;
152 void *logline_regex = NULL;
153 int max_connections = 10;
154 int max_worker_requests = 1000;
155 apr_global_mutex_t *gmutex = NULL;
156 apr_thread_mutex_t *mutex = NULL;
157 apr_pool_t *pool = NULL;
158 apr_pool_t *thread_pool = NULL;
159 apr_pool_t *recv_pool = NULL;
160 apr_array_header_t *queue = NULL;
161 const char *queue_path = NULL;
162 /* apr_time_t queue_time = 0; */
163 void *requestline_regex = NULL;
165 const char *sensor_password = NULL;
166 const char *sensor_username = NULL;
167 int server_error = 0;
168 apr_time_t server_error_last_check_time = 0;
169 int server_error_timeout = 60;
170 int startup_delay = 100;
171 int transaction_delay = 100;
172 const char *transaction_log_path = NULL;
173 apr_file_t *transaction_log_fd = NULL;
176 /* -- Commandline opts -- */
182 static char *_log_escape(apr_pool_t *mp, const char *input, apr_size_t input_len)
184 static const char c2x_table[] = "0123456789abcdef";
185 unsigned char *d = NULL;
189 if (input == NULL) return NULL;
191 ret = apr_palloc(mp, input_len * 4 + 1);
192 if (ret == NULL) return NULL;
193 d = (unsigned char *)ret;
196 while(i < input_len) {
227 if ((input[i] <= 0x1f)||(input[i] >= 0x7f)) {
230 *d++ = c2x_table[input[i] >> 4];
231 *d++ = c2x_table[input[i] & 0x0f];
247 * Converts a byte given as its hexadecimal representation
248 * into a proper byte. Handles uppercase and lowercase letters
249 * but does not check for overflows.
251 static unsigned char x2c(unsigned char *what) {
252 register unsigned char digit;
254 digit = (what[0] >= 'A' ? ((what[0] & 0xdf) - 'A') + 10 : (what[0] - '0'));
256 digit += (what[1] >= 'A' ? ((what[1] & 0xdf) - 'A') + 10 : (what[1] - '0'));
262 * URL Decodes a string in-place
264 static int urldecode_inplace(unsigned char *input, apr_size_t input_len) {
265 unsigned char *d = (unsigned char *)input;
268 if (input == NULL) return 0;
271 while (i < input_len) {
272 if (input[i] == '%') {
273 /* Character is a percent sign. */
275 /* Are there enough bytes available? */
276 if (i + 2 < input_len) {
277 char c1 = input[i + 1];
278 char c2 = input[i + 2];
280 if (ISHEXCHAR(c1) && ISHEXCHAR(c2)) {
281 /* Valid encoding - decode it. */
282 *d++ = x2c(&input[i + 1]);
285 /* Not a valid encoding, skip this % */
289 /* Not enough bytes available, copy the raw bytes. */
293 /* Character is not a percent sign. */
294 if (input[i] == '+') {
309 * Detect a relative path and merge it with the collector root
310 * path. Leave absolute paths as they are.
312 static const char *file_path(const char *path)
314 char *newpath = NULL;
317 if (path == NULL) return NULL;
319 rc = apr_filepath_merge(&newpath, collector_root, path, APR_FILEPATH_TRUENAME, pool);
320 if ((newpath != NULL) && (rc == APR_SUCCESS || APR_STATUS_IS_EPATHWILD(rc)
321 || APR_STATUS_IS_ENOENT(rc) || APR_STATUS_IS_ENOTDIR(rc)))
332 * Returns the current datetime as a string.
334 static char *current_logtime(char *dest, int dlen)
339 apr_time_exp_lt(&t, apr_time_now());
340 apr_strftime(dest, &len, dlen, "%a %b %d %H:%M:%S %Y", &t);
347 * Logs error to the error log (if available) or
350 static void error_log(int level, void *thread, const char *text, ...)
352 char msg1[4096] = "";
353 char msg2[4096] = "";
357 if (level > error_log_level) return;
361 apr_vsnprintf(msg1, sizeof(msg1), text, ap);
362 apr_snprintf(msg2, sizeof(msg2), "[%s] [%d] [%" APR_PID_T_FMT "/%pp] %s\n", current_logtime(datetime, sizeof(datetime)), level, logc_pid, (thread ? thread : 0), msg1);
364 if (error_log_fd != NULL) {
365 apr_size_t nbytes_written;
366 apr_size_t nbytes = strlen(msg2);
367 apr_file_write_full(error_log_fd, msg2, nbytes, &nbytes_written);
370 fprintf(stderr, "%s", msg2);
378 * Adds one entry to the internal queue. It will (optionally) start
379 * a new thread to handle it.
381 static void add_entry(const char *data, int start_worker)
383 entry_t *entry = NULL;
385 entry = (entry_t *)malloc(sizeof(entry_t));
387 entry->line = strdup(data);
388 entry->line_size = strlen(entry->line);
390 error_log(LOG_DEBUG, NULL, "Queue locking thread mutex.");
391 if (APR_STATUS_IS_EBUSY(apr_thread_mutex_trylock(mutex))) {
392 error_log(LOG_DEBUG, NULL, "Queue waiting on thread mutex.");
393 apr_thread_mutex_lock(mutex);
396 /* Assign unique ID to this log entry. */
397 entry->id = entry_counter++;
399 /* Add the new audit log entry to the queue. */
400 *(entry_t **)apr_array_push(queue) = entry;
402 /* Create a new worker if we can, but not if there is a known problem with the server. */
403 if ((start_worker != 0)&&(current_workers < max_connections)&&(server_error == 0)) {
404 create_new_worker(0);
407 error_log(LOG_DEBUG, NULL, "Queue unlocking thread mutex.");
408 apr_thread_mutex_unlock(mutex);
413 * Read the queue entries.
415 static int read_queue_entries(apr_file_t *fd, apr_time_t *queue_time)
421 apr_status_t rc = apr_file_gets(linebuf, 4096, fd);
424 if (rc == APR_EOF) break;
425 if (rc != APR_SUCCESS) {
426 error_log(LOG_ERROR, NULL, "Error reading from the queue file.");
430 if (line_count < 0) {
431 /* First line contains the queue time. */
432 *queue_time = (apr_time_t)apr_atoi64(linebuf);
439 /* Remove the \n from the end of the line. */
448 if (linebuf[0] == '#') { /* Ignore comments. */
452 add_entry((const char *)&linebuf, 0);
464 * Initialise the transaction log. This code should be
465 * executed only once at startup.
467 static void transaction_log_init(void)
469 /* ENH: These big enough? */
470 char new_queue_path[256];
471 char old_queue_path[256];
472 apr_file_t *queue_fd = NULL;
473 apr_time_t queue_time;
475 apr_snprintf(new_queue_path, sizeof(new_queue_path), "%s.new", queue_path);
476 apr_snprintf(old_queue_path, sizeof(old_queue_path), "%s.old", queue_path);
478 /* Put a lock in place to ensure exclusivity. */
479 error_log(LOG_DEBUG, NULL, "Transaction initialization locking global mutex.");
480 if (APR_STATUS_IS_EBUSY(apr_global_mutex_trylock(gmutex))) {
481 error_log(LOG_DEBUG, NULL, "Transaction initialization waiting on global mutex.");
482 apr_global_mutex_lock(gmutex);
485 error_log(LOG_DEBUG, NULL, "Transaction initialization started.");
487 /* Delete .new file if there is one. */
488 apr_file_remove(new_queue_path, pool);
490 /* Read in the data from the queue. */
491 if (apr_file_open(&queue_fd, queue_path, APR_READ | APR_FILE_NOCLEANUP,
492 0, pool) == APR_SUCCESS)
494 int line_count = read_queue_entries(queue_fd, &queue_time);
496 apr_file_close(queue_fd);
498 if (line_count > 0) {
499 error_log(LOG_NOTICE, NULL, "Loaded %d entries from the queue file.", line_count);
502 /* Try the old queue file. */
503 else if (apr_file_open(&queue_fd, old_queue_path, APR_READ | APR_FILE_NOCLEANUP,
504 0, pool) == APR_SUCCESS)
506 int line_count = read_queue_entries(queue_fd, &queue_time);
507 apr_file_close(queue_fd);
508 error_log(LOG_NOTICE, NULL, "Loaded %d entries from the OLD queue file.", line_count);
509 apr_file_rename(old_queue_path, queue_path, pool);
512 error_log(LOG_NOTICE, NULL, "Queue file not found. New one will be created.");
515 /* Delete the old queue file. */
516 apr_file_remove(old_queue_path, pool);
518 checkpoint_time_last = apr_time_now();
520 /* Start fresh with the transaction log. Do note that
521 * we do not truncate the transaction log on purpose. Apache
522 * will start copies of piped logging binaries during configuration
523 * testing. Truncating would erase the log of a currently running
526 if (apr_file_open(&transaction_log_fd, transaction_log_path, APR_WRITE | APR_CREATE
527 | APR_APPEND | APR_XTHREAD, APR_OS_DEFAULT, pool) != APR_SUCCESS)
529 error_log(LOG_ERROR, NULL, "Failed to open the transaction log: %s\n", transaction_log_path);
530 error_log(LOG_DEBUG, NULL, "Transaction initialization unlocking global mutex.");
531 apr_global_mutex_unlock(gmutex);
535 error_log(LOG_DEBUG, NULL, "Transaction initialization completed.");
538 error_log(LOG_DEBUG, NULL, "Transaction initialization unlocking global mutex.");
539 apr_global_mutex_unlock(gmutex);
544 * Log entry event (incoming or outgoing) to the transaction log.
546 static void transaction_log(int direction, const char *entry)
548 apr_size_t nbytes, nbytes_written;
551 apr_snprintf(msg, sizeof(msg), "%u %s: %s\n", (unsigned int)apr_time_sec(apr_time_now()),
552 (direction == TXIN ? "IN" : "OUT"), entry);
553 nbytes = strlen(msg);
554 apr_file_write_full(transaction_log_fd, msg, nbytes, &nbytes_written);
559 * Executes a checkpoint, which causes the current queue to be
560 * written to a file and the transaction log to be truncated.
562 static void transaction_checkpoint(void)
564 /* ENH: These big enough? */
565 char new_queue_path[256];
566 char old_queue_path[256];
567 apr_file_t *queue_fd = NULL;
568 apr_hash_index_t *hi = NULL;
573 apr_snprintf(new_queue_path, sizeof(new_queue_path), "%s.new", queue_path);
574 apr_snprintf(old_queue_path, sizeof(old_queue_path), "%s.old", queue_path);
575 apr_snprintf(msg, sizeof(msg), "%u\n", (unsigned int)apr_time_sec(apr_time_now()));
577 if (! have_read_data) {
578 error_log(LOG_DEBUG, NULL, "Checkpoint not required.");
582 /* Put a lock in place to ensure exclusivity. */
583 error_log(LOG_DEBUG, NULL, "Checkpoint locking global mutex.");
584 if (APR_STATUS_IS_EBUSY(apr_global_mutex_trylock(gmutex))) {
585 error_log(LOG_DEBUG, NULL, "Checkpoint waiting on global mutex.");
586 apr_global_mutex_lock(gmutex);
589 error_log(LOG_DEBUG, NULL, "Checkpoint started.");
591 apr_pool_create(&cpool, NULL);
593 /* Dump active entries into a new queue file. */
594 if (apr_file_open(&queue_fd, new_queue_path, APR_WRITE | APR_CREATE
595 | APR_EXCL | APR_TRUNCATE | APR_FILE_NOCLEANUP, APR_OS_DEFAULT, cpool) != APR_SUCCESS)
597 error_log(LOG_ERROR, NULL, "Failed to create file: %s", new_queue_path);
598 error_log(LOG_DEBUG, NULL, "Checkpoint unlocking global mutex.");
599 apr_pool_destroy(cpool);
600 apr_global_mutex_unlock(gmutex);
604 /* Write the time first. */
605 apr_file_write_full(queue_fd, msg, strlen(msg), NULL);
607 /* Dump the entries sitting in the queue first. */
608 for (i = 0; i < queue->nelts; i++) {
609 entry_t *entry = ((entry_t **)queue->elts)[i];
610 apr_file_write_full(queue_fd, entry->line, entry->line_size, NULL);
611 apr_file_write_full(queue_fd, &"\n", 1, NULL);
613 error_log(LOG_DEBUG2, NULL, "Checkpoint wrote %d queued entries to new queue.", i);
615 /* Then dump the ones that are currently being processed. */
617 for (hi = apr_hash_first(NULL, in_progress); hi != NULL; hi = apr_hash_next(hi)) {
619 entry_t *entry = NULL;
622 apr_hash_this(hi, NULL, NULL, &e);
623 entry = e; /* quiet type-punned warning */
624 apr_file_write_full(queue_fd, entry->line, entry->line_size, NULL);
625 apr_file_write_full(queue_fd, &"\n", 1, NULL);
627 error_log(LOG_DEBUG2, NULL, "Checkpoint wrote %d additional entries to new queue.", i);
629 apr_file_close(queue_fd);
631 /* Switch the files and truncate the transaction log file. */
632 apr_file_remove(old_queue_path, cpool);
633 apr_file_rename(queue_path, old_queue_path, cpool);
634 apr_file_rename(new_queue_path, queue_path, cpool);
635 apr_file_remove(old_queue_path, cpool);
636 apr_file_trunc(transaction_log_fd, 0);
638 error_log(LOG_DEBUG, NULL, "Checkpoint completed.");
640 apr_pool_destroy(cpool);
642 /* Unlock and exit. */
643 error_log(LOG_DEBUG, NULL, "Checkpoint unlocking global mutex.");
644 apr_global_mutex_unlock(gmutex);
649 * Parse one confguration line and add it to the
650 * configuration table.
652 static void parse_configuration_line(const char *line, int line_count)
654 char *start = NULL, *command = NULL;
657 /* Remove the trailing newline character. */
659 while(*p != '\0') p++;
660 if ((p > start)&&(*(p - 1) == '\n')) *(p - 1) = '\0';
663 /* Ignore whitespace at the beginning of the line. */
664 while(apr_isspace(*p)) p++;
666 /* Ignore empty lines and comments. */
667 if ((*p == '\0')||(*p == '#')) return;
670 while(!apr_isspace(*p)&&(*p != '\0')) p++;
672 command = apr_pstrmemdup(pool, start, p - start);
674 while(apr_isspace(*p)) p++;
676 /* Remove whitespace at the end. */
678 while(*p != '\0') p++;
681 while(apr_isspace(*p)) {
686 /* Remove quotes, but only if we have matching */
687 if ((*start == '"') && (p > start) && (*p == '"')) {
692 /* Take the last directive */
693 /* ENH: Error on dup directives? */
694 apr_table_set(conf, command, start);
699 * Reads configuration from a file.
701 static void read_configuration(void)
708 conf = apr_table_make(pool, 32);
710 error_log(LOG_ERROR, NULL, MEMALLOC_ERROR_MSG);
714 rc = apr_file_open(&fd, conffile, APR_READ | APR_FILE_NOCLEANUP, 0, pool);
715 if (rc != APR_SUCCESS) {
716 error_log(LOG_ERROR, NULL, "Unable to open configuration file: %s", conffile);
722 rc = apr_file_gets(linebuf, 4096, fd);
723 if (rc == APR_EOF) return;
724 if (rc != APR_SUCCESS) {
725 error_log(LOG_ERROR, NULL, "Error reading from the configuration file.");
730 parse_configuration_line(linebuf, line_count);
738 * Initialize the configuration.
740 static void init_configuration(void)
744 const char *s = NULL;
746 /* Other values may be based off the collector root. */
747 s = apr_table_get(conf, "CollectorRoot");
753 s = apr_table_get(conf, "ErrorLog");
755 error_log_path = file_path(s);
758 s = apr_table_get(conf, "ErrorLogLevel");
760 error_log_level = atoi(s);
763 if ((rc = apr_file_open(&error_log_fd, error_log_path, APR_WRITE | APR_CREATE | APR_APPEND,
764 APR_OS_DEFAULT, pool)) != APR_SUCCESS)
766 error_log(LOG_ERROR, NULL, "Failed to open the error log %s: %s\n",
767 error_log_path, apr_strerror(rc, errstr, 1024));
771 error_log(LOG_NOTICE, NULL, "Configuring ModSecurity Audit Log Collector %s.", VERSION);
774 s = apr_table_get(conf, "StartupDelay");
776 startup_delay = atoi(s);
779 if ( startup_delay > 0 ) {
780 error_log(LOG_NOTICE, NULL, "Delaying execution for %dms.", startup_delay);
781 apr_sleep(startup_delay * 1000);
782 error_log(LOG_DEBUG, NULL, "Continuing execution after %dms delay.", startup_delay);
785 /* Remaining Configuration */
787 error_log(LOG_DEBUG2, NULL, "CollectorRoot=%s", collector_root);
788 error_log(LOG_DEBUG2, NULL, "ErrorLog=%s", error_log_path);
789 error_log(LOG_DEBUG2, NULL, "ErrorLogLevel=%d", error_log_level);
790 error_log(LOG_DEBUG2, NULL, "StartupDelay=%d", startup_delay);
792 s = apr_table_get(conf, "CheckpointInterval");
794 checkpoint_interval = atoi(s);
795 error_log(LOG_DEBUG2, NULL, "CheckpointInterval=%d", checkpoint_interval);
798 s = apr_table_get(conf, "QueuePath");
800 queue_path = file_path(s);
801 error_log(LOG_DEBUG2, NULL, "QueuePath=%s", queue_path);
804 error_log(LOG_ERROR, NULL, "QueuePath not defined in the configuration file.");
808 s = apr_table_get(conf, "LockFile");
810 lockfile = file_path(s);
811 error_log(LOG_DEBUG2, NULL, "LockFile=%s", lockfile);
814 s = apr_table_get(conf, "ServerErrorTimeout");
816 server_error_timeout = atoi(s);
817 error_log(LOG_DEBUG2, NULL, "ServerErrorTimeout=%d", server_error_timeout);
820 s = apr_table_get(conf, "TransactionDelay");
822 transaction_delay = atoi(s);
823 error_log(LOG_DEBUG2, NULL, "TransactionDelay=%d", transaction_delay);
826 s = apr_table_get(conf, "TransactionLog");
828 transaction_log_path = file_path(s);
829 error_log(LOG_DEBUG2, NULL, "TransactionLog=%s", transaction_log_path);
832 s = apr_table_get(conf, "MaxConnections");
835 if (v >= 0) max_connections = v;
836 error_log(LOG_DEBUG2, NULL, "MaxConnections=%d", max_connections);
839 s = apr_table_get(conf, "MaxWorkerRequests");
842 if (v >= 0) max_worker_requests = v;
843 error_log(LOG_DEBUG2, NULL, "MaxWorkerRequests=%d", max_worker_requests);
846 s = apr_table_get(conf, "KeepAlive");
849 if (v >= 0) keep_alive = v;
850 error_log(LOG_DEBUG2, NULL, "KeepAlive=%d", keep_alive);
853 s = apr_table_get(conf, "KeepAliveTimeout");
856 if (v >= 0) keep_alive_timeout = v;
857 error_log(LOG_DEBUG2, NULL, "KeepAliveTimeout=%d", keep_alive_timeout);
860 s = apr_table_get(conf, "LogStorageDir");
862 log_repository = file_path(s);
863 error_log(LOG_DEBUG2, NULL, "LogStorageDir=%s", log_repository);
866 error_log(LOG_ERROR, NULL, "Missing mandatory parameter LogStorageDir.\n");
870 s = apr_table_get(conf, "ConsoleURI");
873 error_log(LOG_DEBUG2, NULL, "ConsoleURI=%s", console_uri);
876 error_log(LOG_ERROR, NULL, "Missing mandatory parameter ConsoleURI.\n");
880 s = apr_table_get(conf, "SensorUsername");
883 error_log(LOG_DEBUG2, NULL, "SensorUsername=%s", sensor_username);
886 error_log(LOG_ERROR, NULL, "Missing mandatory parameter SensorUsername.\n");
890 s = apr_table_get(conf, "SensorPassword");
893 error_log(LOG_DEBUG2, NULL, "SensorPassword=%s", sensor_password);
896 error_log(LOG_ERROR, NULL, "Missing mandatory parameter SensorPassword.\n");
900 s = apr_table_get(conf, "KeepEntries");
902 keep_entries = atoi(s);
907 error_log(LOG_DEBUG2, NULL, "KeepEntries=%d", keep_entries);
912 * Clean-up resources before process shutdown.
914 static void logc_cleanup(void)
916 curl_global_cleanup();
921 * Shutdown the logger.
923 static void logc_shutdown(int rc)
925 /* Tell the threads to shut down. */
928 error_log(LOG_DEBUG, NULL, "Shutting down");
930 /* Wait for the management thread to stop */
931 /* ENH: Need a fixed timeout if this never happens */
932 while(management_thread_active != 0) {
933 apr_sleep(10 * 1000);
937 error_log(LOG_NOTICE, NULL, "ModSecurity Audit Log Collector %s terminating normally.", VERSION);
940 error_log(LOG_NOTICE, NULL, "ModSecurity Audit Log Collector %s terminating with error %d", VERSION, rc);
943 if (error_log_fd != NULL) {
944 apr_file_flush(error_log_fd);
954 static int handle_signals(int signum)
958 error_log(LOG_NOTICE, NULL, "Caught SIGINT, shutting down.");
961 error_log(LOG_NOTICE, NULL, "Caught SIGTERM, shutting down.");
965 error_log(LOG_NOTICE, NULL, "Caught SIGHUP, ignored.");
966 /* ENH: reload config? */
969 error_log(LOG_DEBUG, NULL, "Caught SIGALRM, ignored.");
972 error_log(LOG_DEBUG, NULL, "Caught SIGTSTP, ignored.");
977 error_log(LOG_NOTICE, NULL, "Caught unexpected signal %d: %s", signum, apr_signal_description_get(signum));
979 error_log(LOG_NOTICE, NULL, "Caught unexpected signal %d", signum);
983 return 0; /* should never reach */
988 * This function is invoked by Curl to read the response
989 * body. Since we don't care about the response body the function
990 * pretends it is retrieving data where it isn't.
992 size_t curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream)
994 unsigned char *data = (unsigned char *)ptr;
995 unsigned char *status = (unsigned char *)stream;
997 /* Grab the status line text from the first line of output */
998 if ((status[0] == 0) && (status[1] == 1)) {
1002 status[1] = 0; /* reset hidden init flag */
1004 for (i = 0, j = 0; i < STATUSBUF_SIZE; i++) {
1005 /* We found a line ending so we are done */
1006 if ( data[i] == '\r' ) {
1009 /* Skip to after the first space (where msg is) */
1011 if ((ismsg == 1) && !isspace(data[i])) {
1014 else if (isspace(data[i])) {
1020 /* Copy data (msg) from data to status */
1021 status[j++] = data[i];
1024 urldecode_inplace(status, j);
1028 return (size * nmemb);
1033 * This function is invoked by Curl whenever it has something
1034 * to say. We forward its messages to the error log at level
1037 int curl_debugfunction(CURL *curl, curl_infotype infotype, char *data, size_t datalen, void *ourdata)
1039 apr_size_t i, effectivelen;
1040 apr_thread_t *thread = (apr_thread_t *)ourdata;
1042 if (error_log_level < LOG_DEBUG) return 0;
1044 effectivelen = datalen;
1045 for(i = 0; i < datalen; i++) {
1046 if ((data[i] == 0x0a)||(data[i] == 0x0d)) {
1052 if (error_log_level >= LOG_DEBUG) {
1053 if (infotype == CURLINFO_TEXT) {
1054 error_log(LOG_DEBUG, thread, "CURL: %s", _log_escape(apr_thread_pool_get(thread), data, effectivelen));
1057 if (error_log_level >= LOG_DEBUG2) {
1058 if (infotype == CURLINFO_HEADER_IN) {
1059 error_log(LOG_DEBUG2, thread, "CURL: HEADER_IN %s", _log_escape(apr_thread_pool_get(thread), data, effectivelen));
1061 else if (infotype == CURLINFO_HEADER_OUT) {
1062 error_log(LOG_DEBUG2, thread, "CURL: HEADER_OUT %s", _log_escape(apr_thread_pool_get(thread), data, effectivelen));
1064 else if (infotype == CURLINFO_DATA_IN) {
1065 error_log(LOG_DEBUG2, thread, "CURL: DATA_IN %s", _log_escape(apr_thread_pool_get(thread), data, effectivelen));
1067 else if (infotype == CURLINFO_DATA_OUT) {
1068 error_log(LOG_DEBUG2, thread, "CURL: DATA_OUT %s", _log_escape(apr_thread_pool_get(thread), data, effectivelen));
1077 * Initialise the necessary resources and structures.
1079 static void logc_init(void)
1082 apr_status_t rc = 0;
1083 const char *errptr = NULL;
1086 queue = apr_array_make(pool, 64, sizeof(entry_t *));
1087 if (queue == NULL) {
1088 error_log(LOG_ERROR, NULL, MEMALLOC_ERROR_MSG);
1092 in_progress = apr_hash_make(pool);
1093 if (in_progress == NULL) {
1094 error_log(LOG_ERROR, NULL, MEMALLOC_ERROR_MSG);
1098 if ((rc = apr_global_mutex_create(&gmutex, lockfile, APR_LOCK_DEFAULT, pool)) != APR_SUCCESS) {
1099 error_log(LOG_ERROR, NULL, "Failed to create global mutex: %s",
1100 apr_strerror(rc, errstr, 1024));
1104 if ((rc = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_UNNESTED, pool)) != APR_SUCCESS) {
1105 error_log(LOG_ERROR, NULL, "Failed to create thread mutex: %s",
1106 apr_strerror(rc, errstr, 1024));
1112 curl_handles = apr_array_make(pool, max_connections, sizeof(CURL *));
1113 if (curl_handles == NULL) {
1114 error_log(LOG_ERROR, NULL, MEMALLOC_ERROR_MSG);
1118 /* Initialise a number of Curl handles. */
1119 for(i = 0; i < max_connections; i++) {
1122 /* Create cURL handle. */
1123 curl = curl_easy_init();
1125 /* Pre-configure the handle. */
1126 curl_easy_setopt(curl, CURLOPT_UPLOAD, TRUE);
1127 curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, (char *)NULL);
1128 curl_easy_setopt(curl, CURLOPT_URL, console_uri);
1129 curl_easy_setopt(curl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
1130 curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, FALSE);
1131 curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0);
1132 /* SSLv3 works better overall as some servers have issues with TLS */
1133 curl_easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_SSLv3);
1134 curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 15);
1135 curl_easy_setopt(curl, CURLOPT_NOSIGNAL, TRUE);
1136 curl_easy_setopt(curl, CURLOPT_HEADER, TRUE);
1138 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_writefunction);
1140 *(CURL **)apr_array_push(curl_handles) = curl;
1143 logline_regex = pcre_compile(logline_pattern, PCRE_CASELESS, &errptr, &erroffset, NULL);
1144 if (logline_regex == NULL) {
1145 error_log(LOG_ERROR, NULL, "Failed to compile pattern: %s\n", logline_pattern);
1149 requestline_regex = pcre_compile(requestline_pattern, PCRE_CASELESS, &errptr, &erroffset, NULL);
1150 if (requestline_regex == NULL) {
1151 error_log(LOG_ERROR, NULL, "Failed to compile pattern: %s\n", requestline_pattern);
1158 * HACK: To allow two mlogcs running against a single dataset we use the
1159 * mtime as a flag for deletion.
1161 * 1) Check file date.
1162 * 2) If it is KEEP_ENTRIES_REMOVE_TIME, then remove the file.
1163 * 3) Otherwise set the date and let the other mlogc remove it.
1165 static void keep_entries_hack(apr_pool_t *mp, apr_thread_t *thread, const char *fn)
1167 apr_file_t *f = NULL;
1172 /* Opening for write as required for exclusive lock */
1173 if ((rc = apr_file_open(&f, fn, APR_READ|APR_WRITE|APR_APPEND, APR_OS_DEFAULT, mp)) != APR_SUCCESS) {
1174 error_log(LOG_ERROR, thread, "Could not open \"%s\": %s", fn, apr_strerror(rc, errstr, 1024));
1178 if ((rc = apr_file_lock(f, APR_FLOCK_EXCLUSIVE|APR_FLOCK_NONBLOCK)) != APR_SUCCESS) {
1179 error_log(LOG_DEBUG2, thread, "Waiting for lock on \"%s\": %s", fn, apr_strerror(rc, errstr, 1024));
1180 if ((rc = apr_file_lock(f, APR_FLOCK_EXCLUSIVE)) != APR_SUCCESS) {
1181 error_log(LOG_ERROR, thread, "Could not lock \"%s\": %s", fn, apr_strerror(rc, errstr, 1024));
1186 error_log(LOG_DEBUG2, thread, "Locked: %s", fn);
1188 /* For testing only */
1189 TEST_WITH_RAND_SLEEP(2);
1191 if ((rc = apr_stat(&finfo, fn, APR_FINFO_MIN, mp)) != APR_SUCCESS) {
1192 error_log(LOG_ERROR, thread, "Could not stat \"%s\": %s", fn, apr_strerror(rc, errstr, 1024));
1193 error_log(LOG_DEBUG2, thread, "Unlocked: %s", fn);
1198 if (error_log_level >= LOG_DEBUG) {
1199 error_log(LOG_DEBUG, thread, "STAT \"%s\" {uid=%d; gid=%d; size=%" APR_OFF_T_FMT "; csize=%" APR_OFF_T_FMT "; atime=%" APR_TIME_T_FMT "; ctime=%" APR_TIME_T_FMT "; mtime=%" APR_TIME_T_FMT "}", fn, finfo.user, finfo.group, finfo.size, finfo.csize, finfo.atime, finfo.ctime, finfo.mtime);
1202 if (finfo.mtime != KEEP_ENTRIES_REMOVE_TIME) {
1203 error_log(LOG_DEBUG2, thread, "Set mtime: %s", fn);
1204 if ((rc = apr_file_mtime_set(fn, (apr_time_t)KEEP_ENTRIES_REMOVE_TIME, mp)) != APR_SUCCESS) {
1205 error_log(LOG_ERROR, thread, "Could not set mtime on \"%s\": %s", fn, apr_strerror(rc, errstr, 1024));
1207 error_log(LOG_DEBUG2, thread, "Unlocked: %s", fn);
1213 error_log(LOG_DEBUG, thread, "Removing: %s", fn);
1214 error_log(LOG_DEBUG2, thread, "Unlocked: %s", fn);
1216 apr_file_remove(fn, mp);
1221 * Worker thread. Works in a loop, fetching jobs from the queue,
1222 * until the queue is empty or it is otherwise told to quit.
1224 static void * APR_THREAD_FUNC thread_worker(apr_thread_t *thread, void *data)
1226 unsigned int loop_count = 0;
1227 CURL *curl = (CURL *)data;
1228 entry_t **entryptr = NULL;
1229 entry_t *entry = NULL;
1232 int capturevector[CAPTUREVECTORSIZE];
1235 struct curl_slist *headerlist = NULL;
1236 char curl_error_buffer[CURL_ERROR_SIZE] = "";
1237 int num_requests = 0;
1239 /* There is no need to do the sleep if this was an invalid entry
1240 * as the sleep is just to protect flooding the console server
1241 * with rapid requests. With an invalid entry we never hit the
1242 * server, so we should not delay processing the next event.
1247 error_log(LOG_DEBUG, thread, "Worker thread starting.");
1249 /* Each worker uses its own pool to manage memory. To avoid
1250 * memory leaks the pool is cleared after each processed
1253 apr_pool_create(&tpool, thread_pool);
1255 /* Process jobs in a queue until there are no more jobs to process. */
1259 /* Do we need to shut down? */
1261 error_log(LOG_DEBUG, thread, "We were told to shut down.");
1262 goto THREAD_SHUTDOWN;
1265 /* Is there a problem with the server? We need
1266 * to shut down if there is. Except that we don't
1267 * want to shut down if we were launched to investigate
1268 * if the server came back online (loop_count will be
1269 * zero in that case).
1271 if ((server_error == 1)&&(loop_count != 0)) {
1272 error_log(LOG_DEBUG, thread, "Shutting down due to server error.");
1273 goto THREAD_SHUTDOWN;
1278 /* Get a new entry, but only if we need one. */
1280 error_log(LOG_DEBUG, thread, "Worker fetch locking thread mutex.");
1281 if (APR_STATUS_IS_EBUSY(apr_thread_mutex_trylock(mutex))) {
1282 error_log(LOG_DEBUG, thread, "Worker fetch waiting on thread mutex.");
1283 apr_thread_mutex_lock(mutex);
1286 error_log(LOG_DEBUG, thread, "Worker fetch started.");
1288 /* Deal with the previous entry. */
1289 if (entry != NULL) {
1290 error_log(LOG_DEBUG, thread, "Removing previous entry from storage.");
1291 transaction_log(TXOUT, entry->line);
1293 /* Remove previous entry from storage. */
1294 apr_hash_set(in_progress, &entry->id, sizeof(entry->id), NULL);
1296 /* Release the memory it used to occupy. */
1297 free((void *)entry->line);
1302 error_log(LOG_DEBUG, thread, "Getting one entry from the queue.");
1304 /* Get one entry. */
1305 entryptr = (entry_t **)apr_array_pop(queue);
1306 if (entryptr == NULL) {
1307 error_log(LOG_DEBUG, thread, "Worker fetch unlocking thread mutex.");
1308 apr_thread_mutex_unlock(mutex);
1309 error_log(LOG_DEBUG, thread, "No more work for this thread, exiting.");
1311 goto THREAD_SHUTDOWN;
1315 apr_hash_set(in_progress, &entry->id, sizeof(entry->id), entry);
1317 error_log(LOG_DEBUG, thread, "Worker fetch completed.");
1319 error_log(LOG_DEBUG, thread, "Worker fetch unlocking thread mutex.");
1320 apr_thread_mutex_unlock(mutex);
1323 /* Send one entry. */
1325 error_log(LOG_DEBUG, thread, "Processing entry.");
1328 /* Keep track of requests processed if we need to */
1329 if (max_worker_requests > 0) {
1333 rc = pcre_exec(logline_regex, NULL, entry->line, entry->line_size, 0, 0,
1334 capturevector, CAPTUREVECTORSIZE);
1335 if (rc == PCRE_ERROR_NOMATCH) { /* No match. */
1336 error_log(LOG_WARNING, thread, "Invalid entry (failed to match regex): %s", _log_escape(tpool, entry->line, entry->line_size));
1340 else if (rc < 0) { /* Error condition. */
1341 error_log(LOG_WARNING, thread, "Invalid entry (PCRE error %d): %s", rc, _log_escape(tpool, entry->line, entry->line_size));
1345 else { /* We have a match. */
1346 char *uniqueid = NULL;
1347 char *auditlogentry = NULL;
1349 char *summary = NULL;
1350 char *credentials = NULL;
1352 error_log(LOG_DEBUG, thread, "Regular expression matched.");
1354 /* For testing only */
1355 TEST_WITH_RAND_SLEEP(2);
1357 uniqueid = apr_psprintf(tpool, "%.*s",
1358 (capturevector[2*13+1] - capturevector[2*13]), (entry->line + capturevector[2*13]));
1359 auditlogentry = apr_psprintf(tpool, "%s/%.*s", log_repository,
1360 (capturevector[2*15+1] - capturevector[2*15]), (entry->line + capturevector[2*15]));
1361 hash = apr_psprintf(tpool, "X-Content-Hash: %.*s",
1362 (capturevector[2*18+1] - capturevector[2*15]), (entry->line + capturevector[2*18]));
1363 summary = apr_psprintf(tpool, "X-ForensicLog-Summary: %s", entry->line);
1364 credentials = apr_psprintf(tpool, "%s:%s", sensor_username, sensor_password);
1366 rc = apr_stat(&finfo, auditlogentry, APR_FINFO_SIZE, tpool);
1367 if (rc == APR_SUCCESS) {
1369 char response_buf[STATUSBUF_SIZE];
1372 if (error_log_level >= LOG_DEBUG) {
1373 error_log(LOG_DEBUG, thread, "STAT \"%s\" {uid=%d; gid=%d; size=%" APR_OFF_T_FMT "; csize=%" APR_OFF_T_FMT "; atime=%" APR_TIME_T_FMT "; ctime=%" APR_TIME_T_FMT "; mtime=%" APR_TIME_T_FMT "}", auditlogentry, finfo.user, finfo.group, finfo.size, finfo.csize, finfo.atime, finfo.ctime, finfo.mtime);
1376 /* Initialize the respone buffer with a hidden value */
1377 response_buf[0] = 0;
1378 response_buf[1] = 1;
1380 if (finfo.size == 0) {
1381 error_log(LOG_WARNING, thread, "File found (%" APR_SIZE_T_FMT " bytes), skipping.", finfo.size);
1384 goto THREAD_CLEANUP;
1387 error_log(LOG_DEBUG, thread, "File found (%" APR_SIZE_T_FMT " bytes), activating cURL.", finfo.size);
1391 curl_easy_setopt(curl, CURLOPT_VERBOSE, 1);
1392 curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, curl_debugfunction);
1393 curl_easy_setopt(curl, CURLOPT_DEBUGDATA, thread);
1394 curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, curl_error_buffer);
1395 curl_easy_setopt(curl, CURLOPT_USERPWD, credentials);
1396 curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)response_buf);
1398 headerlist = curl_slist_append(headerlist, "Expect:");
1399 headerlist = curl_slist_append(headerlist, hash);
1400 headerlist = curl_slist_append(headerlist, summary);
1401 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headerlist);
1403 hd_src = fopen(auditlogentry, "rb");
1404 if (hd_src == NULL) {
1405 error_log(LOG_WARNING, thread, "Invalid entry (failed to open file for reading): %s", auditlogentry);
1408 goto THREAD_CLEANUP;
1411 curl_easy_setopt(curl, CURLOPT_READDATA, hd_src);
1412 curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, finfo.size);
1415 curl_easy_setopt(curl, CURLOPT_READFUNCTION, read_callback);
1418 res = curl_easy_perform(curl);
1423 long response_code = 0;
1425 res = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
1426 error_log(LOG_DEBUG, thread, "Request returned with status \"%ld %s\": %s", response_code, response_buf, uniqueid);
1429 if (response_code == 0) {
1430 /* Assume problem with connection */
1431 error_log(LOG_WARNING, thread, "Flagging server as errored after failure to retrieve response code for entry %s (cURL code %d): Possible SSL negotiation error",
1433 apr_sleep(1000 * 1000);
1436 server_error_last_check_time = apr_time_now();
1438 else if (res != 0) {
1439 error_log(LOG_WARNING, thread, "Flagging server as errored after failure to retrieve response code for entry %s (cURL code %d): %s",
1440 uniqueid, res, curl_error_buffer);
1441 apr_sleep(1000 * 1000);
1444 server_error_last_check_time = apr_time_now();
1447 if (response_code == 200) {
1448 double total_time, upload_size;
1450 if (server_error == 1) {
1451 error_log(LOG_NOTICE, thread, "Clearing the server error flag after successful entry submission: %s", uniqueid);
1454 server_error_last_check_time = 0;
1456 curl_easy_getinfo(curl, CURLINFO_TOTAL_TIME, &total_time);
1457 curl_easy_getinfo(curl, CURLINFO_SIZE_UPLOAD, &upload_size);
1459 if (!keep_entries) {
1460 error_log(LOG_DEBUG, thread, "Removing: %s", auditlogentry);
1461 apr_file_remove(auditlogentry, tpool);
1463 else if (keep_entries == KEEP_ENTRIES_REMOVE_HACK) {
1464 keep_entries_hack(tpool, thread, auditlogentry);
1467 error_log(LOG_NOTICE, thread, "Entry completed (%.3f seconds, %.0f bytes): %s",
1468 total_time, upload_size,
1472 else if (response_code == 409) {
1473 /* Assume problem with audit log entry. */
1474 error_log(LOG_WARNING, thread, "Failed to submit entry with \"409 %s\": %s",
1475 response_buf, uniqueid);
1479 /* Assume problem with server. */
1480 error_log(LOG_WARNING, thread, "Flagging server as errored after failure to submit entry %s with HTTP response code %ld: %s",
1481 uniqueid, response_code, response_buf);
1483 server_error_last_check_time = apr_time_now();
1488 else { /* Something isn't right. */
1489 error_log(LOG_WARNING, thread, "Flagging server as errored after failure to submit entry %s (cURL code %d): %s", uniqueid, res, curl_error_buffer);
1491 server_error_last_check_time = apr_time_now();
1497 error_log(LOG_WARNING, thread, "Invalid entry (file not found %d): %s", rc, auditlogentry);
1502 /* If we are tracking num_requests, then shutdown if we are
1503 * over our threshold.
1505 if (num_requests && (num_requests >= max_worker_requests)) {
1506 error_log(LOG_NOTICE, thread, "Reached max requests (%d) for this worker, exiting.", max_worker_requests);
1508 goto THREAD_SHUTDOWN;
1514 /* Sleep if we sent data to the server so we do not flood */
1515 /* ENH: Need to sleep for 1ms in a loop checking for shutdown */
1516 if ((nodelay == 0) && (transaction_delay > 0)) {
1517 error_log(LOG_DEBUG, thread, "Sleeping for %d msec.", transaction_delay);
1518 apr_sleep(transaction_delay * 1000);
1521 if (headerlist != NULL) {
1522 curl_slist_free_all(headerlist);
1526 apr_pool_clear(tpool);
1528 error_log(LOG_DEBUG, thread, "Worker processing completed.");
1533 error_log(LOG_DEBUG, thread, "Worker shutdown locking thread mutex.");
1534 if (APR_STATUS_IS_EBUSY(apr_thread_mutex_trylock(mutex))) {
1535 error_log(LOG_DEBUG, thread, "Worker shutdown waiting on thread mutex.");
1536 apr_thread_mutex_lock(mutex);
1539 /* Deal with the previous entry, if any. */
1540 if (entry != NULL) {
1541 apr_hash_set(in_progress, &entry->id, sizeof(entry->id), NULL);
1543 if (take_new == 0) { /* Not done. */
1544 *(entry_t **)apr_array_push(queue) = entry;
1547 transaction_log(TXOUT, entry->line);
1548 free((void *)entry->line);
1555 /* Return curl handle to the pool for reuse. */
1556 *(CURL **)apr_array_push(curl_handles) = curl;
1558 /* No more work, exit. */
1561 error_log(LOG_DEBUG, thread, "Worker shutdown unlocking thread mutex.");
1562 apr_thread_mutex_unlock(mutex);
1564 apr_pool_destroy(tpool);
1566 error_log(LOG_DEBUG, thread, "Worker thread completed.");
1568 apr_thread_exit(thread, 0);
1575 * Creates one new worker, giving it one of the available
1576 * Curl handles to work with.
1578 static void create_new_worker(int lock)
1580 apr_thread_t *thread = NULL;
1581 CURL **curlptr = NULL;
1584 error_log(LOG_DEBUG, NULL, "Worker creation locking thread mutex.");
1585 if (APR_STATUS_IS_EBUSY(apr_thread_mutex_trylock(mutex))) {
1586 error_log(LOG_DEBUG, NULL, "Worker creation waiting on thread mutex.");
1587 apr_thread_mutex_lock(mutex);
1591 error_log(LOG_DEBUG, NULL, "Worker creation started.");
1593 /* A sanity check: this part executes under lock and
1594 * we want to make *sure* we don't create more threads
1595 * than we are allowed.
1597 if (current_workers >= max_connections) {
1599 error_log(LOG_DEBUG, NULL, "Worker creation unlocking thread mutex.");
1600 apr_thread_mutex_unlock(mutex);
1605 /* Cleanup thread pool when idle */
1606 if (current_workers <= 0) {
1607 if (thread_pool != NULL) {
1608 error_log(LOG_DEBUG, NULL, "Destroying thread_pool.");
1609 apr_pool_destroy(thread_pool);
1612 error_log(LOG_DEBUG, NULL, "Creating thread_pool.");
1613 apr_pool_create(&thread_pool, NULL);
1616 curlptr = (CURL **)apr_array_pop(curl_handles);
1617 if (curlptr != NULL) {
1618 apr_threadattr_t *thread_attrs;
1621 apr_threadattr_create(&thread_attrs, thread_pool);
1622 apr_threadattr_detach_set(thread_attrs, 1);
1623 apr_threadattr_stacksize_set(thread_attrs, 1024);
1625 rc = apr_thread_create(&thread, thread_attrs, thread_worker, *curlptr, thread_pool);
1626 if (rc != APR_SUCCESS) {
1628 error_log(LOG_DEBUG, NULL, "Worker creation unlocking thread mutex.");
1629 apr_thread_mutex_unlock(mutex);
1631 error_log(LOG_ERROR, NULL, "Failed to create new worker thread: %d", rc);
1639 error_log(LOG_DEBUG, NULL, "Worker creation unlocking thread mutex.");
1640 apr_thread_mutex_unlock(mutex);
1642 error_log(LOG_ERROR, NULL, "No more cURL handles (Internal Error).");
1646 error_log(LOG_DEBUG, NULL, "Worker creation completed: %pp", thread);
1649 error_log(LOG_DEBUG, NULL, "Worker creation unlocking thread mutex.");
1650 apr_thread_mutex_unlock(mutex);
1656 * This function implements the management thread.
1658 static void * APR_THREAD_FUNC thread_manager(apr_thread_t *thread, void *data)
1660 apr_time_t last = 0;
1663 error_log(LOG_DEBUG, thread, "Management thread: Starting.");
1666 now = apr_time_now();
1668 /* Should we stop running? */
1670 /* We need to be last */
1671 error_log(LOG_DEBUG, thread, "Management thread: Waiting for worker threads to finish.");
1672 while(current_workers > 0) {
1673 apr_sleep(10 * 1000);
1676 if (have_read_data) {
1677 error_log(LOG_NOTICE, thread, "Running final transaction checkpoint.");
1678 transaction_checkpoint();
1681 error_log(LOG_DEBUG, thread, "Management thread: Exiting.");
1682 management_thread_active = 0;
1683 apr_thread_exit(thread, 0);
1686 /* Sleep for a while, but wake up often to check running status */
1687 if ((last > 0) && ((now - last) < MANAGER_SLEEP)) {
1688 apr_sleep(MANAGER_SUBSLEEP);
1693 error_log(LOG_DEBUG2, thread, "Management thread: Processing");
1695 /* When the server is flagged errored we need to
1696 * create a worker thread from time to time to
1700 if ((current_workers == 0)&&
1701 (apr_time_sec(now - server_error_last_check_time) > server_error_timeout))
1703 server_error_last_check_time = now;
1704 error_log(LOG_DEBUG, thread, "Management thread: Creating worker thread to investigate server.");
1705 create_new_worker(1);
1709 if ((current_workers < max_connections)&&(queue->nelts > current_workers)) {
1710 error_log(LOG_DEBUG, thread, "Management thread: Creating worker thread to catch up with the queue.");
1711 create_new_worker(1);
1715 /* Initiate a transaction log checkpoint if enough time passed since the last one. */
1716 if (apr_time_sec(now - checkpoint_time_last) > checkpoint_interval) {
1717 error_log(LOG_DEBUG, thread, "Management thread: Initiating a checkpoint"
1718 " (previous was %" APR_TIME_T_FMT " seconds ago).", apr_time_sec(now - checkpoint_time_last));
1719 checkpoint_time_last = now;
1720 transaction_checkpoint();
1723 error_log(LOG_DEBUG2, thread, "Management thread: Last checkpoint was %" APR_TIME_T_FMT " seconds ago.",
1724 apr_time_sec(now - checkpoint_time_last));
1733 * Thread to handle all signals
1735 static void * APR_THREAD_FUNC thread_signals(apr_thread_t *thread, void *data)
1739 error_log(LOG_DEBUG, thread, "Signal thread: Starting.");
1740 rc = apr_signal_thread(handle_signals);
1741 if (rc != APR_SUCCESS) {
1742 error_log(LOG_DEBUG, thread, "Signal thread: Error %d", rc);
1751 * The main loop where we receive log entries from
1752 * Apache and add them to the queue, sometimes creating
1753 * new worker threads to handle them.
1755 static void receive_loop(void) {
1756 apr_file_t *fd_stdin;
1757 apr_size_t nbytes = PIPE_BUF_SIZE;
1758 char *buf = apr_palloc(pool, PIPE_BUF_SIZE + 1);
1760 apr_size_t evnt = 0; /* Index in buf to first event char */
1761 apr_size_t curr = 0; /* Index in buf to current processing char */
1762 apr_size_t next = 0; /* Index in buf to next unused char */
1765 int buffered_events = 0;
1767 apr_pool_t *tmp_pool;
1770 if (apr_file_open_stdin(&fd_stdin, pool) != APR_SUCCESS) {
1771 error_log(LOG_ERROR, NULL, "Unable to open stdin for reading");
1775 /* Always want this NUL terminated */
1776 buf[PIPE_BUF_SIZE] = '\0';
1778 apr_pool_create(&tmp_pool, NULL);
1780 /* Loop forever receiving entries from stdin. */
1781 while(!done || (curr < next)) {
1784 if (error_log_level >= LOG_DEBUG2) {
1785 error_log(LOG_DEBUG2, NULL, "Internal state: [evnt \"%" APR_SIZE_T_FMT "\"][curr \"%" APR_SIZE_T_FMT "\"][next \"%" APR_SIZE_T_FMT "\"][nbytes \"%" APR_SIZE_T_FMT "\"]", evnt, curr, next, nbytes);
1788 /* If we are not done and have the space, read more */
1789 if (!done && (nbytes > 0)) {
1790 buffered_events = 0;
1791 nbytes = PIPE_BUF_SIZE - next;
1792 rc = apr_file_read(fd_stdin, (buf + next), &nbytes);
1793 if (rc != APR_SUCCESS) {
1794 if (have_read_data) {
1795 error_log(LOG_NOTICE, NULL, "No more data to read, emptying buffer: %s", apr_strerror(rc, errstr, 1024));
1801 if (error_log_level == LOG_DEBUG) {
1802 error_log(LOG_DEBUG, NULL, "Read %" APR_SIZE_T_FMT " bytes from pipe", nbytes);
1805 error_log(LOG_DEBUG2, NULL, "Read %" APR_SIZE_T_FMT " bytes from pipe: `%s'", nbytes, _log_escape(tmp_pool, (buf + next), nbytes));
1813 * Each chunk of data we receive can contain one or more lines for
1814 * which we need to find the EOL marker and then queue the event
1815 * up to that. So, find/queue as many lines in the buffer as we
1816 * can. Any remaining data will get shifted back to the beginning
1817 * of the buffer and the buffer size for the next read adjusted.
1819 while(curr < next) {
1820 /* Look for EOL so we can parse the event */
1821 while((curr < next) && (buf[curr] != 0x0a)) {
1824 if (buf[curr] == 0x0a) {
1827 /* We may have to drop this one if it previously failed */
1829 error_log(LOG_ERROR, NULL, "Dropping remaining portion of failed event: `%s'", _log_escape(tmp_pool, (buf + evnt), (curr - evnt)));
1833 transaction_log(TXIN, buf + evnt);
1834 error_log(LOG_DEBUG2, NULL, "Received audit log entry (count %lu queue %d workers %d): %s",
1835 entry_counter, queue->nelts, current_workers, _log_escape(tmp_pool, (buf + evnt), strlen(buf + evnt)));
1836 add_entry(buf + evnt, 1);
1840 /* Advance indexes to next event in buf */
1841 evnt = curr = curr + 1;
1844 error_log(LOG_DEBUG2, NULL, "Event buffer contains partial event: `%s'", _log_escape(tmp_pool, (buf + evnt), (next - evnt)));
1850 if (buffered_events > 0) {
1851 error_log(LOG_DEBUG, NULL, "Processed %d entries from buffer.", buffered_events);
1853 /* Move the unused portion of the buffer to the beginning */
1856 memmove(buf, (buf + evnt), next);
1858 error_log(LOG_DEBUG2, NULL, "Shifted buffer back %" APR_SIZE_T_FMT " and offset %" APR_SIZE_T_FMT " bytes for next read: `%s'", evnt, next, _log_escape(tmp_pool, buf, next));
1862 else if (next == PIPE_BUF_SIZE) {
1864 * There is a chance we could fill the buffer, but not have finished
1865 * reading the event (no EOL yet), so we need to say so and drop
1866 * all data until we find the end of the event that is too large.
1870 error_log(LOG_ERROR, NULL, "Event continuation too large, dropping it as well: `%s'", _log_escape(tmp_pool, buf, PIPE_BUF_SIZE));
1873 error_log(LOG_ERROR, NULL, "Event too large, dropping event: `%s'", _log_escape(tmp_pool, buf, PIPE_BUF_SIZE));
1876 /* Rewind buf and mark that we need to drop up to the next event */
1877 evnt = curr = next = 0;
1881 nbytes = PIPE_BUF_SIZE - next;
1883 if (count++ > 1000) {
1885 error_log(LOG_DEBUG, NULL, "Recycling tmp_pool.");
1886 apr_pool_destroy(tmp_pool);
1887 apr_pool_create(&tmp_pool, NULL);
1890 apr_pool_clear(tmp_pool);
1894 /* Wait for queue to empty if specified */
1895 if ((server_error == 0) && (opt_force != 0) && (queue->nelts > 0)) {
1896 error_log(LOG_NOTICE, NULL, "Waiting for queue to empty (%d active).", queue->nelts);
1897 while ((server_error == 0) && (opt_force != 0) && (queue->nelts > 0)) {
1898 apr_sleep(10 * 1000);
1900 if (queue->nelts > 0) {
1901 error_log(LOG_ERROR, NULL, "Could not empty queue (%d active).", queue->nelts);
1908 * Creates the management thread.
1910 static void start_management_thread(void)
1912 apr_thread_t *thread = NULL;
1913 apr_threadattr_t *thread_attrs;
1916 apr_threadattr_create(&thread_attrs, pool);
1917 apr_threadattr_detach_set(thread_attrs, 1);
1918 apr_threadattr_stacksize_set(thread_attrs, 1024);
1920 management_thread_active = 1;
1922 rc = apr_thread_create(&thread, thread_attrs, thread_manager, NULL, pool);
1923 if (rc != APR_SUCCESS) {
1924 error_log(LOG_ERROR, NULL, "Failed to create new management thread: %d", rc);
1925 management_thread_active = 0;
1931 * Creates a thread to handle all signals
1933 static void start_signal_thread(void)
1935 apr_thread_t *thread = NULL;
1936 apr_threadattr_t *thread_attrs;
1939 apr_threadattr_create(&thread_attrs, pool);
1940 apr_threadattr_detach_set(thread_attrs, 1);
1941 apr_threadattr_stacksize_set(thread_attrs, 1024);
1943 rc = apr_thread_create(&thread, thread_attrs, thread_signals, NULL, pool);
1944 if (rc != APR_SUCCESS) {
1945 error_log(LOG_ERROR, NULL, "Failed to create new signal thread: %d", rc);
1954 static void usage(void) {
1955 fprintf(stderr, "ModSecurity Log Collector (mlogc) v%s\n", VERSION);
1956 fprintf(stderr, " Usage: mlogc [options] /path/to/the/mlogc.conf\n");
1957 fprintf(stderr, "\n");
1958 fprintf(stderr, " Options:\n");
1959 fprintf(stderr, " -f Force depletion of queue on exit\n");
1960 fprintf(stderr, " -v Version information\n");
1961 fprintf(stderr, " -h This help\n\n");
1967 static void version(void) {
1968 fprintf(stderr, "ModSecurity Log Collector (mlogc) v%s\n", VERSION);
1969 fprintf(stderr, " APR: compiled=\"%s\"; loaded=\"%s\"\n", APR_VERSION_STRING, apr_version_string());
1970 fprintf(stderr, " PCRE: compiled=\"%d.%d\"; loaded=\"%s\"\n", PCRE_MAJOR, PCRE_MINOR, pcre_version());
1971 fprintf(stderr, " cURL: compiled=\"%s\"; loaded=\"%s\"\n", LIBCURL_VERSION, curl_version());
1972 fprintf(stderr, "\n");
1976 * This is the main entry point.
1978 int main(int argc, const char * const argv[]) {
1982 apr_app_initialize(&argc, &argv, NULL);
1983 atexit(apr_terminate);
1985 curl_global_init(CURL_GLOBAL_ALL);
1986 atexit(logc_cleanup);
1988 logc_pid = getpid();
1989 apr_pool_create(&pool, NULL);
1990 apr_pool_create(&recv_pool, NULL);
1993 apr_setup_signal_thread();
1995 apr_signal(SIGINT, handle_signals);
1996 apr_signal(SIGTERM, handle_signals);
2004 /* Commandline opts */
2005 rc = apr_getopt_init(&opt, pool, argc, argv);
2006 if (rc != APR_SUCCESS) {
2014 rc = apr_getopt(opt, CMDLINE_OPTS, &ch, &val);
2034 } while (rc != APR_EOF);
2036 /* Conf file is last */
2037 conffile = argv[argc - 1];
2039 read_configuration();
2040 init_configuration();
2043 transaction_log_init();
2048 start_management_thread();
2050 start_signal_thread();
2053 /* Process stdin until EOF */