00001 //--------------------------------------------------------------------------
00002 // Copyright (C) 2014-2017 Cisco and/or its affiliates. All rights reserved.
00003 // Copyright (C) 2013-2013 Sourcefire, Inc.
00004 //
00005 // This program is free software; you can redistribute it and/or modify it
00006 // under the terms of the GNU General Public License Version 2 as published
00007 // by the Free Software Foundation. You may not use, modify or distribute
00008 // this program under any other version of the GNU General Public License.
00009 //
00010 // This program is distributed in the hope that it will be useful, but
00011 // WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
00013 // General Public License for more details.
00014 //
00015 // You should have received a copy of the GNU General Public License along
00016 // with this program; if not, write to the Free Software Foundation, Inc.,
00017 // 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
00018 //--------------------------------------------------------------------------
00019 // analyzer.cc author Russ Combs <rucombs@cisco.com>
00020
00021 #ifdef HAVE_CONFIG_H
00022 #include "config.h"
00023 #endif
00024
00025 #include "analyzer.h"
00026
00027 #include <thread>
00028
00029 #include "log/messages.h"
00030 #include "main/swapper.h"
00031 #include "main.h"
00032 #include "packet_io/sfdaq.h"
00033
00034 #include "analyzer_command.h"
00035 #include "snort.h"
00036 #include "snort_debug.h"
00037 #include "thread.h"
00038
00039 using namespace std;
00040
00041 typedef DAQ_Verdict
00042 (* PacketCallback)(void*, const DAQ_PktHdr_t*, const uint8_t*);
00043
00044 // FIXIT-M add fail open capability
00045 static THREAD_LOCAL PacketCallback main_func = Snort::packet_callback;
00046
00047 //-------------------------------------------------------------------------
00048 // analyzer
00049 //-------------------------------------------------------------------------
00050
00051 void Analyzer::set_state(State s)
00052 {
00053 state = s;
00054 main_poke(id);
00055 }
00056
00057 const char* Analyzer::get_state_string()
00058 {
00059 State s = get_state(); // can't use atomic in switch with optimization
00060
00061 switch ( s )
00062 {
00063 case State::NEW: return "NEW";
00064 case State::INITIALIZED: return "INITIALIZED";
00065 case State::STARTED: return "STARTED";
00066 case State::RUNNING: return "RUNNING";
00067 case State::PAUSED: return "PAUSED";
00068 case State::STOPPED: return "STOPPED";
00069 }
00070
00071 return "UNKNOWN";
00072 }
00073
00074 Analyzer::Analyzer(unsigned i, const char* s)
00075 {
00076 id = i;
00077 source = s ? s : "";
00078 daq_instance = nullptr;
00079 privileged_start = false;
00080 exit_requested = false;
00081 set_state(State::NEW);
00082 }
00083
00084 void Analyzer::operator()(Swapper* ps, uint16_t run_num)
00085 {
00086 set_thread_type(STHREAD_TYPE_PACKET);
00087 set_instance_id(id);
00088 set_run_num(run_num);
00089
00090 ps->apply();
00091 delete ps;
00092
00093 if (Snort::thread_init_privileged(source.c_str()))
00094 {
00095 daq_instance = SFDAQ::get_local_instance();
00096 privileged_start = daq_instance->can_start_unprivileged();
00097 set_state(State::INITIALIZED);
00098
00099 analyze();
00100
00101 Snort::thread_term();
00102 }
00103
00104 set_state(State::STOPPED);
00105 }
00106
00107 /* Note: This will be called from the main thread. Everything it does must be
00108 thread-safe in relation to interactions with the analyzer thread. */
00109 void Analyzer::execute(AnalyzerCommand* ac)
00110 {
00111 pending_work_queue_mutex.lock();
00112 pending_work_queue.push(ac);
00113 pending_work_queue_mutex.unlock();
00114
00115 /* Break out of the DAQ acquire loop so that the command will be processed.
00116 This is explicitly safe to call from another thread. */
00117 if ( state == State::RUNNING and daq_instance )
00118 daq_instance->break_loop(0);
00119 }
00120
00121 bool Analyzer::handle_command()
00122 {
00123 AnalyzerCommand* ac = nullptr;
00124
00125 pending_work_queue_mutex.lock();
00126 if (!pending_work_queue.empty())
00127 {
00128 ac = pending_work_queue.front();
00129 pending_work_queue.pop();
00130 }
00131 pending_work_queue_mutex.unlock();
00132
00133 if (!ac)
00134 return false;
00135
00136 ac->execute(*this);
00137
00138 completed_work_queue_mutex.lock();
00139 completed_work_queue.push(ac);
00140 completed_work_queue_mutex.unlock();
00141
00142 return true;
00143 }
00144
00145 void Analyzer::analyze()
00146 {
00147 // The main analyzer loop is terminated by a command returning false or an error during acquire
00148 while (!exit_requested)
00149 {
00150 if (handle_command())
00151 continue;
00152
00153 // If we're not in the running state (usually either pre-start or paused),
00154 // just keep stalling until something else comes up.
00155 if (state != State::RUNNING)
00156 {
00157 chrono::milliseconds ms(10);
00158 this_thread::sleep_for(ms);
00159 continue;
00160 }
00161 if (daq_instance->acquire(0, main_func))
00162 break;
00163
00164 // FIXIT-L acquire(0) makes idle processing unlikely under high traffic
00165 // because it won't return until no packets, signal, etc. that means
00166 // the idle processing may not be useful or that we need a hook to do
00167 // things periodically even when traffic is available
00168 Snort::thread_idle();
00169 }
00170 }
00171
00172 void Analyzer::start()
00173 {
00174 assert(state == State::INITIALIZED);
00175
00176 if (!daq_instance->start())
00177 {
00178 ErrorMessage("Analyzer: Failed to start DAQ instance\n");
00179 exit_requested = true;
00180 }
00181 set_state(State::STARTED);
00182 DebugMessage(DEBUG_ANALYZER, "Handled START command\n");
00183 }
00184
00185 void Analyzer::run(bool paused)
00186 {
00187 assert(state == State::STARTED);
00188 Snort::thread_init_unprivileged();
00189 if ( paused )
00190 set_state(State::PAUSED);
00191 else
00192 set_state(State::RUNNING);
00193 DebugMessage(DEBUG_ANALYZER, "Handled RUN command\n");
00194 }
00195
00196 void Analyzer::stop()
00197 {
00198 exit_requested = true;
00199 DebugMessage(DEBUG_ANALYZER, "Handled STOP command\n");
00200 }
00201
00202 void Analyzer::pause()
00203 {
00204 if (state == State::RUNNING)
00205 set_state(State::PAUSED);
00206 else
00207 ErrorMessage("Analyzer: Received PAUSE command while in state %s\n",
00208 get_state_string());
00209 }
00210
00211 void Analyzer::resume()
00212 {
00213 if (state == State::PAUSED)
00214 set_state(State::RUNNING);
00215 else
00216 ErrorMessage("Analyzer: Received RESUME command while in state %s\n",
00217 get_state_string());
00218 }
00219
00220 void Analyzer::reload_daq()
00221 {
00222 if (daq_instance)
00223 daq_instance->reload();
00224 DebugMessage(DEBUG_ANALYZER, "Handled RELOAD command\n");
00225 }
00226
END OF CODE