Rößner-Network-Solutions

Commit e48ee126 authored by David Meyer's avatar David Meyer
Browse files

Added functionality to anonymize domains and general strings in addition to ip addresses.

Added rules for fortigate, infoblox, exchange
parent 66660e57
#!/usr/bin/env python
#!/usr/local/bin/python3
# copyright sys4 AG 2015
# This file is part of loganon.
#
#
# loganon is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
#
# loganon is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
#
# You should have received a copy of the GNU Lesser General Public License
# along with loganon. If not, see <http://www.gnu.org/licenses/>.
......@@ -21,10 +21,16 @@ import os
import sys
import re
import yaml
import magic
import io
from getopt import getopt
from netaddr import IPAddress, IPNetwork
from ipaddress import ip_address
from netaddr.core import AddrFormatError
import hashlib
#import zlib
#from random import randint
try:
from collections import OrderedDict
......@@ -46,7 +52,7 @@ def ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict):
def usage():
"""Print a simple usage to stdout
"""
print """%s [options]
print("""%s [options]
-h, --help prints out this help
-i, --input=file log file to read
......@@ -59,7 +65,7 @@ Optional:
-6, --mask6=number number of bits to mask an IPv6 address
-t, --test test pattern and print output to stdout
""" % os.path.basename(__file__)
""" % os.path.basename(__file__))
def main():
"""Main application
......@@ -92,6 +98,13 @@ def main():
ipv4 = re.compile("[1-9][0-9]{0,2}\.[0-9.]{3,7}\.[0-9]{1,3}")
ipv6 = re.compile("([1-9a-fA-F][0-9a-fA-F]{3}):"
"[0-9a-fA-F:]{2,29}[0-9a-fA-F]{1,4}")
domain = re.compile("([^\s=\"\(\):]*\.)?[a-zA-Z0-9][a-zA-Z0-9-]{1,61}[a-zA-Z0-9]\.[a-zA-Z]{2,}")
syslog_prio = re.compile("(auth|cron|daemon|kern|local[0-7]|lpr|mail|news|user|uucp)\.(info|notice|warning|err|alert|warn|debug|emerg|crit)", re.IGNORECASE)
def get_encoding(file):
blob = open(file, "rb").read()
m = magic.Magic(mime_encoding=True)
return m.from_buffer(blob)
# Read command line options
try:
......@@ -135,10 +148,10 @@ def main():
usage()
sys.exit(os.EX_USAGE)
except Exception, e:
except Exception as e:
print >> sys.stderr, "Syntax error: %s" % e
sys.exit(os.EX_USAGE)
# Read all rules
try:
for rule in iter(rules):
......@@ -147,22 +160,22 @@ def main():
yaml.SafeLoader,
OrderedDict))
except IOError, e:
except IOError as e:
print >> sys.stderr, "IOError: %s" % e
sys.exit(os.EX_IOERR)
except Exception, e:
except Exception as e:
print >> sys.stderr, "Unknown error: %s" % e
sys.exit(os.EX_USAGE)
# Build macro dictionary
for rule_entity in iter(rules_collection):
for service, ruledef in rule_entity.iteritems():
for rulename, rulepattern in ruledef.iteritems():
for service, ruledef in rule_entity.items():
for rulename, rulepattern in ruledef.items():
search = None
replace = None
for patterndef in iter(rulepattern):
for actiondesc, actiondef in patterndef.iteritems():
for actiondesc, actiondef in patterndef.items():
if actiondesc == "search":
search = actiondef
if actiondesc == "replace":
......@@ -175,26 +188,26 @@ def main():
sys.exit(os.EX_USAGE)
try:
rule_data[rulename] = (re.compile(search), replace)
except Exception, e:
except Exception as e:
print >> sys.stderr, ("Syntax error in <search> or "
"<replace> pattern: %s" % e)
sys.exit(os.EX_USAGE)
# Open input and output files
try:
fd_in = open(fdinarg, "r")
fd_in = io.open(fdinarg, "r", encoding=get_encoding(fdinarg))
if not test:
fd_out = open(fdoutarg, "w")
except IOError, e:
print >> sys.stderr, "IOError: %s" % e
except IOError as e:
print("IOError: %s" % e, file=sys.stderr)
sys.exit(os.EX_IOERR)
except Exception, e:
print >> sys.stderr, "Unknown error: %s" % e
except Exception as e:
print("Unknown error: %s" % e, file=sys.stderr)
sys.exit(os.EX_USAGE)
def reduce_ip(matchobj):
def maybe_ip(matchobj):
maybe_ip = False
# simple tests
......@@ -217,8 +230,10 @@ def main():
test = matchobj.group(0).split(":")
if len(test) >= 2:
maybe_ip = True
return maybe_ip
if maybe_ip:
def reduce_ip(matchobj):
if maybe_ip(matchobj):
try:
ip = IPAddress(matchobj.group(0))
except AddrFormatError:
......@@ -235,25 +250,105 @@ def main():
else:
return matchobj.group(0)
def ips(start, end):
'''Return IPs in IPv4 range, inclusive.'''
start_int = int(ip_address(start).packed.hex(), 16)
end_int = int(ip_address(end).packed.hex(), 16)
return [ip_address(ip).exploded for ip in range(start_int, end_int)]
iprepo_global = ips('1.2.0.1', '1.2.20.254')
iprepo_private = ips('10.10.0.1', '10.10.20.254')
ip_map = {}
def map_ip(matchobj):
if maybe_ip(matchobj):
try:
ip = IPAddress(matchobj.group(0))
except AddrFormatError:
# might be something else than an IPv6 address
return matchobj.group(0)
if ip.version == 4:
if not str(ip) in ip_map:
if ip.is_private():
ip_map[str(ip)] = iprepo_private.pop(0)
else:
ip_map[str(ip)] = iprepo_global.pop(0)
return ip_map[str(ip)]
#return str(bitmask4 & ip)
else:
return str(bitmask6 & ip)
return str(ip)
else:
return matchobj.group(0)
def string_hash(_str):
_hash = list(hashlib.md5(_str.encode()).hexdigest())
ret = ''
for i in range(0,len(_hash),2):
ret += map_char(_hash[i] + _hash[i+1])
return ret
domain_map = {}
def map_domain(matchobj):
_domain = matchobj.group(0)
if re.match(syslog_prio, _domain) is not None:
return _domain
if not _domain in domain_map:
parts = _domain.split('.')
for idx in range(len(parts)):
parts[idx] = string_hash(parts[idx])
domain_map[_domain] = '.'.join(parts)
#print(_domain + '=' + domain_map[_domain])
return domain_map[_domain]
def map_char(hex):
ic = int(hex, 16)
offset = int(ic / 255 * 50)
if offset <= 25:
return chr(65 + offset)
else:
return chr(72 + offset)
string_map = {}
def map_string(_str):
if not _str in string_map:
string_map[_str] = string_hash(_str)
return string_map[_str]
while True:
line = fd_in.readline()
if not line:
break
# Phase 1 - search and replace pattern
for key, value in rule_data.iteritems():
for key, value in rule_data.items():
try:
linenew = value[0].sub(value[1], line)
replace = value[1]
if '_MAP_' in replace:
found = value[0].search(line)
if found:
_str = found.group(1)
replace = replace.replace('_MAP_', map_string(_str))
linenew = value[0].sub(replace, line)
if linenew is not None:
line = linenew
except Exception, e:
except Exception as e:
print >> sys.stderr, e
# Phase 2 - find IPv4/IPv6 address
line = re.sub(ipv4, reduce_ip, line)
line = re.sub(ipv4, map_ip, line)
line = re.sub(ipv6, reduce_ip, line)
# Phase 3 - search and replace domains
line = re.sub(domain, map_domain, line)
if test:
print line.strip()
print(line.strip())
else:
fd_out.write(line)
......
# copyright sys4 AG 2015
# This file is part of loganon.
#
# loganon is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# loganon is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with loganon. If not, see <http://www.gnu.org/licenses/>.
exchange:
sender-address:
- search: "sender-address:([^@]+)@"
- replace: "sender-address:_MAP_@"
return-path:
- search: "return-path:([^@]+)@"
- replace: "return-path:_MAP_@"
recipient-address:
- search: "recipient-address:([^@]+)@"
- replace: "recipient-address:_MAP_@"
UserID:
- search: "UserID:([^,]+)"
- replace: "UserID:_MAP_"
AccountName:
- search: "AccountName:([^,]+)"
- replace: "AccountName:_MAP_"
Domain:
- search: "Domain:([^,]+)"
- replace: "Domain:_MAP_"
# vim: syn=yaml ts=2 sw=2 expandtab
# copyright sys4 AG 2015
# This file is part of loganon.
#
# loganon is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# loganon is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with loganon. If not, see <http://www.gnu.org/licenses/>.
fortigate:
devname:
- search: "devname=[^ ]+"
- replace: "devname=DEVNAME"
devid:
- search: "devid=[^ ]+"
- replace: "devid=DEVID"
#srcip:
# - search: "srcip=[^ ]+"
# - replace: "srcip=IP"
#dstip:
# - search: "dstip=[^ ]+"
# - replace: "dstip=IP"
srcintf:
- search: "srcintf=[^ ]+"
- replace: "srcintf=\"SRCINTF\""
dstintf:
- search: "dstintf=[^ ]+"
- replace: "dstintf=\"DSTINTF\""
srccountry:
- search: "srccountry=[^ ]+"
- replace: "srccountry=\"COUNTRY\""
dstcountry:
- search: "dstcountry=[^ ]+"
- replace: "dstcountry=\"COUNTRY\""
# vim: syn=yaml ts=2 sw=2 expandtab
# copyright sys4 AG 2015
# This file is part of loganon.
#
# loganon is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# loganon is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with loganon. If not, see <http://www.gnu.org/licenses/>.
fortigate:
devname:
- search: "devname=[^ ]+"
- replace: "devname=DEVNAME"
devid:
- search: "devid=[^ ]+"
- replace: "devid=DEVID"
#srcip:
# - search: "srcip=[^ ]+"
# - replace: "srcip=IP"
#dstip:
# - search: "dstip=[^ ]+"
# - replace: "dstip=IP"
srcintf:
- search: "srcintf=[^ ]+"
- replace: "srcintf=SRCINTF"
dstintf:
- search: "dstintf=[^ ]+"
- replace: "dstintf=DSTINTF"
srccountry:
- search: "srccountry=[^ ]+"
- replace: "srccountry=\"COUNTRY\""
dstcountry:
- search: "dstcountry=[^ ]+"
- replace: "dstcountry=\"COUNTRY\""
# vim: syn=yaml ts=2 sw=2 expandtab
# copyright sys4 AG 2015
# This file is part of loganon.
#
# loganon is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# loganon is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with loganon. If not, see <http://www.gnu.org/licenses/>.
webproxy:
user:
- search: "user=\"([^\"]+)\""
- replace: "user=\"_MAP_\""
# vim: syn=yaml ts=2 sw=2 expandtab
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment