Ignore:
Timestamp:
05/16/14 00:29:33 (10 years ago)
Author:
atzm
Message:

improve performance on multi-queue environment

File:
1 edited

Legend:

Unmodified
Added
Removed
  • ksyslog/trunk/ksyslog.c

    r270 r271  
    1313#include <linux/fsnotify.h> 
    1414#include <linux/proc_fs.h> 
    15 #include <linux/u64_stats_sync.h> 
    1615#include <linux/percpu.h> 
    1716#include <net/udp.h> 
     
    1918#include "ksyslog.h" 
    2019 
    21 static struct ksyslog_queue ksyslog_queue; 
    22 static struct socket *ksyslog_rcv_sk = NULL; 
    23  
    24 static struct delayed_work ksyslog_work; 
    25 static struct workqueue_struct *ksyslog_wq = NULL; 
     20static struct socket *ksyslog_rcv_sk; 
     21static struct workqueue_struct *ksyslog_wq; 
     22static struct ksyslog_queue __percpu *ksyslog_queue; 
    2623 
    2724#ifdef CONFIG_PROC_FS 
    28 static struct proc_dir_entry *ksyslog_procdir = NULL; 
    29 static struct proc_dir_entry *ksyslog_proc_queue = NULL; 
    30 static struct proc_dir_entry *ksyslog_proc_size = NULL; 
    31 static struct proc_dir_entry *ksyslog_proc_stats = NULL; 
     25static struct proc_dir_entry *ksyslog_procdir; 
     26static struct proc_dir_entry *ksyslog_proc_size; 
     27static struct proc_dir_entry *ksyslog_proc_stats; 
    3228#endif 
    3329 
     
    3632static char *ksyslog_path = "/var/log/ksyslog.log"; 
    3733static ulong ksyslog_queue_size_max = 4096; 
    38 static ulong ksyslog_flush_interval = 45;  /* milliseconds */ 
    39  
    40 static DEFINE_SPINLOCK(ksyslog_write_lock); 
    4134 
    4235module_param(ksyslog_host, charp, 0444); 
     
    4437module_param(ksyslog_path, charp, 0644); 
    4538module_param(ksyslog_queue_size_max, ulong, 0644); 
    46 module_param(ksyslog_flush_interval, ulong, 0644); 
    47  
    48 static int 
    49 ksyslog_queue_init(struct ksyslog_queue *queue) 
    50 { 
    51         memset(queue, 0, sizeof(*queue)); 
    52         INIT_LIST_HEAD(&queue->head); 
    53         spin_lock_init(&queue->lock); 
    54         atomic64_set(&queue->size, 0); 
    55         queue->stats = alloc_percpu(struct ksyslog_stats); 
    56         if (unlikely(queue->stats == NULL)) 
     39 
     40static int 
     41ksyslog_queue_init(void (*handler)(struct work_struct *)) 
     42{ 
     43        int cpu; 
     44        struct ksyslog_queue *q; 
     45 
     46        ksyslog_queue = alloc_percpu(struct ksyslog_queue); 
     47        if (unlikely(!ksyslog_queue)) 
    5748                return -ENOMEM; 
     49 
     50        for_each_possible_cpu(cpu) { 
     51                q = per_cpu_ptr(ksyslog_queue, cpu); 
     52 
     53                INIT_LIST_HEAD(&q->head); 
     54                INIT_WORK(&q->work, handler); 
     55                spin_lock_init(&q->lock); 
     56                atomic64_set(&q->size, 0); 
     57                ksyslog_stats_zero(&q->write_stats); 
     58                ksyslog_stats_zero(&q->drop_stats); 
     59                ksyslog_stats_zero(&q->discard_stats); 
     60        } 
     61 
    5862        return 0; 
    5963} 
    6064 
    6165static void 
    62 ksyslog_queue_uninit(struct ksyslog_queue *queue) 
    63 { 
    64         if (likely(queue->stats)) 
    65                 free_percpu(queue->stats); 
    66         queue->stats = NULL; 
     66ksyslog_queue_uninit(void) 
     67{ 
     68        if (likely(ksyslog_queue)) 
     69                free_percpu(ksyslog_queue); 
     70        ksyslog_queue = NULL; 
    6771} 
    6872 
     
    161165 
    162166        start = memchr(skb->data, '>', 5); 
    163         if (start == NULL) 
     167        if (!start) 
    164168                return ERR_PTR(-EINVAL); 
    165169        start++; 
     
    186190 
    187191        entry = kzalloc(sizeof(*entry), GFP_ATOMIC); 
    188         if (unlikely(entry == NULL)) 
     192        if (unlikely(!entry)) 
    189193                return ERR_PTR(-ENOMEM); 
    190194 
    191195        length = skb->len - (start - skb->data); 
    192196        entry->data = kzalloc(length, GFP_ATOMIC); 
    193         if (unlikely(entry->data == NULL)) { 
     197        if (unlikely(!entry->data)) { 
    194198                kfree(entry); 
    195199                return ERR_PTR(-ENOMEM); 
     
    264268{ 
    265269        *buf = kzalloc(54 + entry->length + 2, GFP_ATOMIC); 
    266         if (unlikely(*buf == NULL)) 
     270        if (unlikely(!*buf)) 
    267271                return -ENOMEM; 
    268272 
     
    297301 
    298302static void 
    299 ksyslog_work_register(unsigned long timer) 
    300 { 
    301         queue_delayed_work(ksyslog_wq, &ksyslog_work, timer * HZ / 1000); 
    302 } 
    303  
    304 static void 
    305 ksyslog_work_unregister(void) 
    306 { 
    307         cancel_delayed_work_sync(&ksyslog_work); 
    308 } 
    309  
    310 static void 
    311303ksyslog_work_handler(struct work_struct *work) 
    312304{ 
    313         struct file *file = NULL; 
     305        struct file *file; 
    314306        struct ksyslog_entry *entry; 
     307        struct ksyslog_queue *q; 
     308 
     309        q = container_of(work, struct ksyslog_queue, work); 
    315310 
    316311        file = ksyslog_open(ksyslog_path); 
    317312        if (unlikely(IS_ERR(file))) 
    318                 goto out; 
    319  
    320         while (true) { 
    321                 bool write_ok; 
    322  
    323                 spin_lock_bh(&ksyslog_queue.lock); 
    324                 entry = list_first_or_null_rcu(&ksyslog_queue.head, 
    325                                                struct ksyslog_entry, list); 
    326                 if (!entry) { 
    327                         spin_unlock_bh(&ksyslog_queue.lock); 
    328                         break; 
    329                 } 
    330                 ksyslog_entry_del(&ksyslog_queue, entry, false); 
    331                 spin_unlock_bh(&ksyslog_queue.lock); 
    332  
    333                 spin_lock(&ksyslog_write_lock); 
    334                 write_ok = ksyslog_entry_write(file, entry); 
    335                 spin_unlock(&ksyslog_write_lock); 
    336  
    337                 if (likely(write_ok)) { 
    338                         ksyslog_stats_add_write(&ksyslog_queue, entry->length); 
    339                 } else { 
    340                         ksyslog_stats_add_drop(&ksyslog_queue, entry->length); 
    341                         ksyslog_drop_warning(entry); 
    342                 } 
    343  
    344                 call_rcu(&entry->rcu, ksyslog_entry_free); 
    345         } 
    346  
     313                return; 
     314 
     315        spin_lock_bh(&q->lock); 
     316        entry = list_first_or_null_rcu(&q->head, struct ksyslog_entry, list); 
     317        if (unlikely(!entry)) { 
     318                spin_unlock_bh(&q->lock); 
     319                goto out; 
     320        } 
     321        ksyslog_entry_del(q, entry, false); 
     322        spin_unlock_bh(&q->lock); 
     323 
     324        if (likely(ksyslog_entry_write(file, entry))) { 
     325                ksyslog_stats_add(&q->write_stats, entry->length); 
     326        } else { 
     327                ksyslog_stats_add(&q->drop_stats, entry->length); 
     328                ksyslog_drop_warning(entry); 
     329        } 
     330 
     331        call_rcu(&entry->rcu, ksyslog_entry_free); 
     332 
     333out: 
    347334        ksyslog_close(file); 
    348335 
    349 out: 
    350         ksyslog_work_register(ksyslog_flush_interval); 
     336        if (atomic64_read(&q->size) > 0) 
     337                queue_work(ksyslog_wq, work); 
    351338} 
    352339 
     
    358345        struct udphdr *udph; 
    359346        struct ksyslog_entry *entry; 
     347        struct ksyslog_queue *q; 
     348 
     349        q = per_cpu_ptr(ksyslog_queue, smp_processor_id()); 
    360350 
    361351        if (unlikely(skb_linearize(skb))) { 
    362                 ksyslog_stats_add_drop(&ksyslog_queue, skb->len); 
     352                ksyslog_stats_add(&q->drop_stats, skb->len); 
    363353                goto out; 
    364354        } 
     
    368358 
    369359        if (unlikely(!skb_pull(skb, sizeof(*udph)))) { 
    370                 ksyslog_stats_add_drop(&ksyslog_queue, skb->len); 
     360                ksyslog_stats_add(&q->drop_stats, skb->len); 
    371361                goto out; 
    372362        } 
     
    375365        if (unlikely(IS_ERR(entry))) { 
    376366                if (PTR_ERR(entry) == -EINVAL) { 
    377                         ksyslog_stats_add_discard(&ksyslog_queue, skb->len); 
     367                        ksyslog_stats_add(&q->discard_stats, skb->len); 
    378368                        goto out; 
    379369                } 
    380370 
    381                 ksyslog_stats_add_drop(&ksyslog_queue, skb->len); 
    382                 goto out; 
    383         } 
    384  
    385         spin_lock_bh(&ksyslog_queue.lock); 
    386         err = ksyslog_entry_add(&ksyslog_queue, entry); 
    387         spin_unlock_bh(&ksyslog_queue.lock); 
     371                ksyslog_stats_add(&q->drop_stats, skb->len); 
     372                goto out; 
     373        } 
     374 
     375        spin_lock_bh(&q->lock); 
     376        err = ksyslog_entry_add(q, entry); 
     377        spin_unlock_bh(&q->lock); 
    388378 
    389379        if (unlikely(err)) { 
    390                 ksyslog_stats_add_drop(&ksyslog_queue, entry->length); 
     380                ksyslog_stats_add(&q->drop_stats, entry->length); 
    391381                ksyslog_drop_warning(entry); 
    392382                ksyslog_entry_free(&entry->rcu); 
    393383                goto out; 
    394384        } 
     385 
     386        queue_work(ksyslog_wq, &q->work); 
    395387 
    396388out: 
     
    400392 
    401393#ifdef CONFIG_PROC_FS 
    402 static void * 
    403 ksyslog_rculist_seq_start(struct seq_file *seq, loff_t *pos) 
    404 { 
    405         struct list_head *lh, *head = seq->private; 
    406         loff_t ppos = *pos; 
    407  
    408         rcu_read_lock(); 
    409  
    410         __list_for_each_rcu(lh, head) 
    411                 if (ppos-- == 0) 
    412                         return lh; 
    413  
    414         return NULL; 
    415 } 
    416  
    417 static void * 
    418 ksyslog_rculist_seq_next(struct seq_file *seq, void *v, loff_t *pos) 
    419 { 
    420         struct list_head *lh = rcu_dereference(((struct list_head *)v)->next); 
    421         ++(*pos); 
    422         return lh == seq->private ? NULL : lh; 
    423 } 
    424  
    425 static void 
    426 ksyslog_rculist_seq_stop(struct seq_file *seq, void *v) 
    427 { 
    428         rcu_read_unlock(); 
    429 } 
    430  
    431 static int 
    432 ksyslog_queue_seq_show(struct seq_file *seq, void *v) 
    433 { 
    434         const struct ksyslog_entry *entry = list_entry_rcu(v, struct ksyslog_entry, list); 
    435  
    436         seq_printf(seq, "%llu %s.%s %u.%u.%u.%u %.*s\n", 
    437                    timeval_to_ns(&entry->tv) / 1000 / 1000 / 1000, 
    438                    ksyslog_facility_str(entry->facility), 
    439                    ksyslog_severity_str(entry->severity), 
    440                    entry->saddr.addr8[0], entry->saddr.addr8[1], 
    441                    entry->saddr.addr8[2], entry->saddr.addr8[3], 
    442                    (int)entry->length, entry->data); 
    443  
     394static int 
     395ksyslog_size_seq_show(struct seq_file *seq, void *v) 
     396{ 
     397        int cpu; 
     398        struct ksyslog_queue *q; 
     399 
     400        seq_puts(seq, "{\n"); 
     401 
     402        for_each_possible_cpu(cpu) { 
     403                q = per_cpu_ptr(ksyslog_queue, cpu); 
     404                seq_printf(seq, "  \"%u\": \"%lu\",\n", cpu, atomic64_read(&q->size)); 
     405        } 
     406 
     407        seq_puts(seq, "}\n"); 
    444408        return 0; 
    445409} 
    446410 
    447 static struct seq_operations ksyslog_queue_seq_ops = { 
    448         .start = ksyslog_rculist_seq_start, 
    449         .next  = ksyslog_rculist_seq_next, 
    450         .stop  = ksyslog_rculist_seq_stop, 
    451         .show  = ksyslog_queue_seq_show, 
    452 }; 
    453  
    454 static int 
    455 ksyslog_queue_seq_open(struct inode *inode, struct file *file) 
    456 { 
    457         int err = seq_open(file, &ksyslog_queue_seq_ops); 
    458  
    459         if (!err) 
    460                 ((struct seq_file *)file->private_data)->private = PDE_DATA(inode); 
    461  
    462         return err; 
    463 } 
    464  
    465 static struct file_operations ksyslog_queue_fops = { 
    466         .owner   = THIS_MODULE, 
    467         .open    = ksyslog_queue_seq_open, 
    468         .read    = seq_read, 
    469         .llseek  = seq_lseek, 
    470         .release = seq_release, 
    471 }; 
    472  
    473 static int 
    474 ksyslog_size_seq_show(struct seq_file *seq, void *v) 
    475 { 
    476         seq_printf(seq, "%lu\n", atomic64_read(&ksyslog_queue.size)); 
    477         return 0; 
    478 } 
    479  
    480411static int 
    481412ksyslog_size_seq_open(struct inode *inode, struct file *file) 
     
    487418ksyslog_stats_seq_show(struct seq_file *seq, void *v) 
    488419{ 
    489         int i; 
    490         struct ksyslog_stats stats; 
    491  
    492         memset(&stats, 0, sizeof(stats)); 
    493  
    494         for_each_possible_cpu(i) { 
    495                 const struct ksyslog_stats *percpu_stats; 
    496                 struct ksyslog_stats local_stats; 
    497                 unsigned int start; 
    498  
    499                 percpu_stats = per_cpu_ptr(ksyslog_queue.stats, i); 
    500  
    501                 do { 
    502                         start = u64_stats_fetch_begin_bh(&percpu_stats->sync); 
    503                         local_stats = *percpu_stats; 
    504                 } while (u64_stats_fetch_retry_bh(&percpu_stats->sync, start)); 
    505  
    506                 stats.write_bytes += local_stats.write_bytes; 
    507                 stats.write_packets += local_stats.write_packets; 
    508                 stats.drop_bytes += local_stats.drop_bytes; 
    509                 stats.drop_packets += local_stats.drop_packets; 
    510                 stats.discard_bytes += local_stats.discard_bytes; 
    511                 stats.discard_packets += local_stats.discard_packets; 
    512         } 
    513  
    514         seq_puts(seq,   "{\n"); 
    515         seq_puts(seq,   "  \"write\": {\n"); 
    516         seq_printf(seq, "    \"bytes\":   \"%llu\",\n", stats.write_bytes); 
    517         seq_printf(seq, "    \"packets\": \"%llu\"\n", stats.write_packets); 
    518         seq_puts(seq,   "  },\n"); 
    519         seq_puts(seq,   "  \"drop\": {\n"); 
    520         seq_printf(seq, "    \"bytes\":   \"%llu\",\n", stats.drop_bytes); 
    521         seq_printf(seq, "    \"packets\": \"%llu\"\n", stats.drop_packets); 
    522         seq_puts(seq,   "  },\n"); 
    523         seq_puts(seq,   "  \"discard\": {\n"); 
    524         seq_printf(seq, "    \"bytes\":   \"%llu\",\n", stats.discard_bytes); 
    525         seq_printf(seq, "    \"packets\": \"%llu\"\n", stats.discard_packets); 
    526         seq_puts(seq,   "  }\n"); 
    527         seq_puts(seq,   "}\n"); 
    528  
     420        int cpu; 
     421        struct ksyslog_queue *q; 
     422 
     423        seq_puts(seq, "{\n"); 
     424 
     425        for_each_possible_cpu(cpu) { 
     426                q = per_cpu_ptr(ksyslog_queue, cpu); 
     427 
     428                seq_printf(seq, "  \"%u\": {\n", cpu); 
     429                seq_puts(seq,   "    \"write\": {\n"); 
     430                seq_printf(seq, "      \"bytes\":   \"%lu\",\n", atomic64_read(&q->write_stats.bytes)); 
     431                seq_printf(seq, "      \"packets\": \"%lu\",\n", atomic64_read(&q->write_stats.packets)); 
     432                seq_puts(seq,   "    },\n"); 
     433                seq_puts(seq,   "    \"drop\": {\n"); 
     434                seq_printf(seq, "      \"bytes\":   \"%lu\",\n", atomic64_read(&q->drop_stats.bytes)); 
     435                seq_printf(seq, "      \"packets\": \"%lu\",\n", atomic64_read(&q->drop_stats.packets)); 
     436                seq_puts(seq,   "    },\n"); 
     437                seq_puts(seq,   "    \"discard\": {\n"); 
     438                seq_printf(seq, "      \"bytes\":   \"%lu\",\n", atomic64_read(&q->discard_stats.bytes)); 
     439                seq_printf(seq, "      \"packets\": \"%lu\",\n", atomic64_read(&q->discard_stats.packets)); 
     440                seq_puts(seq,   "    },\n"); 
     441                seq_puts(seq,   "  },\n"); 
     442        } 
     443 
     444        seq_puts(seq, "}\n"); 
    529445        return 0; 
    530446} 
     
    555471ksyslog_proc_destroy(void) 
    556472{ 
    557         if (ksyslog_proc_queue) 
    558                 remove_proc_entry("queue", ksyslog_procdir); 
    559         ksyslog_proc_queue = NULL; 
    560  
    561473        if (ksyslog_proc_size) 
    562474                remove_proc_entry("size", ksyslog_procdir); 
     
    576488{ 
    577489        ksyslog_procdir = proc_mkdir("ksyslog", NULL); 
    578         if (ksyslog_procdir == NULL) { 
     490        if (!ksyslog_procdir) { 
    579491                pr_err("ksyslog: proc_mkdir failed\n"); 
    580                 goto err; 
    581         } 
    582  
    583         ksyslog_proc_queue = proc_create_data("queue", S_IRUGO, ksyslog_procdir, 
    584                                               &ksyslog_queue_fops, &ksyslog_queue.head); 
    585         if (ksyslog_proc_queue == NULL) { 
    586                 pr_err("ksyslog: proc_create(queue) failed\n"); 
    587492                goto err; 
    588493        } 
     
    590495        ksyslog_proc_size = proc_create("size", S_IRUGO, ksyslog_procdir, 
    591496                                        &ksyslog_size_fops); 
    592         if (ksyslog_proc_size == NULL) { 
     497        if (!ksyslog_proc_size) { 
    593498                pr_err("ksyslog: proc_create(size) failed\n"); 
    594499                goto err; 
     
    597502        ksyslog_proc_stats = proc_create("stats", S_IRUGO, ksyslog_procdir, 
    598503                                         &ksyslog_stats_fops); 
    599         if (ksyslog_proc_stats == NULL) { 
     504        if (!ksyslog_proc_stats) { 
    600505                pr_err("ksyslog: proc_create(stats) failed\n"); 
    601506                goto err; 
     
    613518ksyslog_finish(void) 
    614519{ 
     520        int cpu; 
     521 
    615522        if (ksyslog_rcv_sk) 
    616523                sock_release(ksyslog_rcv_sk); 
    617524        ksyslog_rcv_sk = NULL; 
    618525 
    619         if (ksyslog_wq) { 
    620                 ksyslog_work_unregister(); 
     526        if (ksyslog_wq) 
    621527                destroy_workqueue(ksyslog_wq); 
    622         } 
    623528        ksyslog_wq = NULL; 
    624529 
     
    627532#endif 
    628533 
    629         ksyslog_entry_destroy(&ksyslog_queue); 
     534        for_each_possible_cpu(cpu) 
     535                ksyslog_entry_destroy(per_cpu_ptr(ksyslog_queue, cpu)); 
    630536        rcu_barrier(); 
    631537 
    632         ksyslog_queue_uninit(&ksyslog_queue); 
     538        ksyslog_queue_uninit(); 
    633539} 
    634540 
     
    639545        struct sockaddr_in sin; 
    640546 
    641         err = ksyslog_queue_init(&ksyslog_queue); 
     547        err = ksyslog_queue_init(ksyslog_work_handler); 
    642548        if (err) 
    643549                goto err; 
     
    650556 
    651557        ksyslog_wq = create_workqueue("ksyslog"); 
    652         if (ksyslog_wq == NULL) { 
     558        if (!ksyslog_wq) { 
    653559                pr_err("ksyslog: create_workqueue failed\n"); 
    654560                err = -ENOMEM; 
    655561                goto err; 
    656562        } 
    657  
    658         INIT_DELAYED_WORK(&ksyslog_work, ksyslog_work_handler); 
    659563 
    660564        err = sock_create(AF_INET, SOCK_DGRAM, 0, &ksyslog_rcv_sk); 
     
    675579        } 
    676580 
    677         ksyslog_work_register(ksyslog_flush_interval); 
    678  
    679581        udp_sk(ksyslog_rcv_sk->sk)->encap_type = UDP_ENCAP_KSYSLOG; 
    680582        udp_sk(ksyslog_rcv_sk->sk)->encap_rcv = ksyslog_rcv; 
Note: See TracChangeset for help on using the changeset viewer.