Rößner-Network-Solutions

Unverified Commit 47ec020c authored by Christian Rößner's avatar Christian Rößner Committed by GitHub
Browse files

Merge pull request #2 from dameyerdave/master

Added functionality to anonymize domains  and general strings...
parents 66660e57 b07e4b11
#!/usr/bin/env python
#!/usr/bin/env python3
# copyright sys4 AG 2015
# This file is part of loganon.
#
#
# loganon is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
#
# loganon is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
#
# You should have received a copy of the GNU Lesser General Public License
# along with loganon. If not, see <http://www.gnu.org/licenses/>.
......@@ -21,10 +21,16 @@ import os
import sys
import re
import yaml
import magic
import io
from getopt import getopt
from netaddr import IPAddress, IPNetwork
from ipaddress import ip_address
from netaddr.core import AddrFormatError
import hashlib
#import zlib
#from random import randint
try:
from collections import OrderedDict
......@@ -46,7 +52,7 @@ def ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict):
def usage():
"""Print a simple usage to stdout
"""
print """%s [options]
print("""%s [options]
-h, --help prints out this help
-i, --input=file log file to read
......@@ -59,7 +65,7 @@ Optional:
-6, --mask6=number number of bits to mask an IPv6 address
-t, --test test pattern and print output to stdout
""" % os.path.basename(__file__)
""" % os.path.basename(__file__))
def main():
"""Main application
......@@ -92,6 +98,13 @@ def main():
ipv4 = re.compile("[1-9][0-9]{0,2}\.[0-9.]{3,7}\.[0-9]{1,3}")
ipv6 = re.compile("([1-9a-fA-F][0-9a-fA-F]{3}):"
"[0-9a-fA-F:]{2,29}[0-9a-fA-F]{1,4}")
domain = re.compile("([^\s=\"\(\):]*\.)?[a-zA-Z0-9][a-zA-Z0-9-]{1,61}[a-zA-Z0-9]\.[a-zA-Z]{2,}")
syslog_prio = re.compile("(auth|cron|daemon|kern|local[0-7]|lpr|mail|news|user|uucp)\.(info|notice|warning|err|alert|warn|debug|emerg|crit)", re.IGNORECASE)
def get_encoding(file):
blob = open(file, "rb").read()
m = magic.Magic(mime_encoding=True)
return m.from_buffer(blob)
# Read command line options
try:
......@@ -135,10 +148,10 @@ def main():
usage()
sys.exit(os.EX_USAGE)
except Exception, e:
except Exception as e:
print >> sys.stderr, "Syntax error: %s" % e
sys.exit(os.EX_USAGE)
# Read all rules
try:
for rule in iter(rules):
......@@ -147,22 +160,22 @@ def main():
yaml.SafeLoader,
OrderedDict))
except IOError, e:
except IOError as e:
print >> sys.stderr, "IOError: %s" % e
sys.exit(os.EX_IOERR)
except Exception, e:
except Exception as e:
print >> sys.stderr, "Unknown error: %s" % e
sys.exit(os.EX_USAGE)
# Build macro dictionary
for rule_entity in iter(rules_collection):
for service, ruledef in rule_entity.iteritems():
for rulename, rulepattern in ruledef.iteritems():
for service, ruledef in rule_entity.items():
for rulename, rulepattern in ruledef.items():
search = None
replace = None
for patterndef in iter(rulepattern):
for actiondesc, actiondef in patterndef.iteritems():
for actiondesc, actiondef in patterndef.items():
if actiondesc == "search":
search = actiondef
if actiondesc == "replace":
......@@ -175,26 +188,26 @@ def main():
sys.exit(os.EX_USAGE)
try:
rule_data[rulename] = (re.compile(search), replace)
except Exception, e:
except Exception as e:
print >> sys.stderr, ("Syntax error in <search> or "
"<replace> pattern: %s" % e)
sys.exit(os.EX_USAGE)
# Open input and output files
try:
fd_in = open(fdinarg, "r")
fd_in = io.open(fdinarg, "r", encoding=get_encoding(fdinarg))
if not test:
fd_out = open(fdoutarg, "w")
except IOError, e:
print >> sys.stderr, "IOError: %s" % e
except IOError as e:
print("IOError: %s" % e, file=sys.stderr)
sys.exit(os.EX_IOERR)
except Exception, e:
print >> sys.stderr, "Unknown error: %s" % e
except Exception as e:
print("Unknown error: %s" % e, file=sys.stderr)
sys.exit(os.EX_USAGE)
def reduce_ip(matchobj):
def maybe_ip(matchobj):
maybe_ip = False
# simple tests
......@@ -217,8 +230,10 @@ def main():
test = matchobj.group(0).split(":")
if len(test) >= 2:
maybe_ip = True
return maybe_ip
if maybe_ip:
def reduce_ip(matchobj):
if maybe_ip(matchobj):
try:
ip = IPAddress(matchobj.group(0))
except AddrFormatError:
......@@ -235,25 +250,105 @@ def main():
else:
return matchobj.group(0)
def ips(start, end):
'''Return IPs in IPv4 range, inclusive.'''
start_int = int(ip_address(start).packed.hex(), 16)
end_int = int(ip_address(end).packed.hex(), 16)
return [ip_address(ip).exploded for ip in range(start_int, end_int)]
iprepo_global = ips('1.2.0.1', '1.2.20.254')
iprepo_private = ips('10.10.0.1', '10.10.20.254')
ip_map = {}
def map_ip(matchobj):
if maybe_ip(matchobj):
try:
ip = IPAddress(matchobj.group(0))
except AddrFormatError:
# might be something else than an IPv6 address
return matchobj.group(0)
if ip.version == 4:
if not str(ip) in ip_map:
if ip.is_private():
ip_map[str(ip)] = iprepo_private.pop(0)
else:
ip_map[str(ip)] = iprepo_global.pop(0)
return ip_map[str(ip)]
#return str(bitmask4 & ip)
else:
return str(bitmask6 & ip)
return str(ip)
else:
return matchobj.group(0)
def string_hash(_str):
_hash = list(hashlib.md5(_str.encode()).hexdigest())
ret = ''
for i in range(0,len(_hash),2):
ret += map_char(_hash[i] + _hash[i+1])
return ret
domain_map = {}
def map_domain(matchobj):
_domain = matchobj.group(0)
if re.match(syslog_prio, _domain) is not None:
return _domain
if not _domain in domain_map:
parts = _domain.split('.')
for idx in range(len(parts)):
parts[idx] = string_hash(parts[idx])
domain_map[_domain] = '.'.join(parts)
#print(_domain + '=' + domain_map[_domain])
return domain_map[_domain]
def map_char(hex):
ic = int(hex, 16)
offset = int(ic / 255 * 50)
if offset <= 25:
return chr(65 + offset)
else:
return chr(72 + offset)
string_map = {}
def map_string(_str):
if not _str in string_map:
string_map[_str] = string_hash(_str)
return string_map[_str]
while True:
line = fd_in.readline()
if not line:
break
# Phase 1 - search and replace pattern
for key, value in rule_data.iteritems():
for key, value in rule_data.items():
try:
linenew = value[0].sub(value[1], line)
replace = value[1]
if '_MAP_' in replace:
found = value[0].search(line)
if found:
_str = found.group(1)
replace = replace.replace('_MAP_', map_string(_str))
linenew = value[0].sub(replace, line)
if linenew is not None:
line = linenew
except Exception, e:
except Exception as e:
print >> sys.stderr, e
# Phase 2 - find IPv4/IPv6 address
line = re.sub(ipv4, reduce_ip, line)
line = re.sub(ipv4, map_ip, line)
line = re.sub(ipv6, reduce_ip, line)
# Phase 3 - search and replace domains
line = re.sub(domain, map_domain, line)
if test:
print line.strip()
print(line.strip())
else:
fd_out.write(line)
......
# copyright sys4 AG 2015
# This file is part of loganon.
#
# loganon is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# loganon is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with loganon. If not, see <http://www.gnu.org/licenses/>.
exchange:
sender-address:
- search: "sender-address:([^@]+)@"
- replace: "sender-address:_MAP_@"
return-path:
- search: "return-path:([^@]+)@"
- replace: "return-path:_MAP_@"
recipient-address:
- search: "recipient-address:([^@]+)@"
- replace: "recipient-address:_MAP_@"
UserID:
- search: "UserID:([^,]+)"
- replace: "UserID:_MAP_"
AccountName:
- search: "AccountName:([^,]+)"
- replace: "AccountName:_MAP_"
Domain:
- search: "Domain:([^,]+)"
- replace: "Domain:_MAP_"
# vim: syn=yaml ts=2 sw=2 expandtab
# copyright sys4 AG 2015
# This file is part of loganon.
#
# loganon is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# loganon is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with loganon. If not, see <http://www.gnu.org/licenses/>.
fortigate:
devname:
- search: "devname=[^ ]+"
- replace: "devname=DEVNAME"
devid:
- search: "devid=[^ ]+"
- replace: "devid=DEVID"
#srcip:
# - search: "srcip=[^ ]+"
# - replace: "srcip=IP"
#dstip:
# - search: "dstip=[^ ]+"
# - replace: "dstip=IP"
srcintf:
- search: "srcintf=[^ ]+"
- replace: "srcintf=\"SRCINTF\""
dstintf:
- search: "dstintf=[^ ]+"
- replace: "dstintf=\"DSTINTF\""
srccountry:
- search: "srccountry=[^ ]+"
- replace: "srccountry=\"COUNTRY\""
dstcountry:
- search: "dstcountry=[^ ]+"
- replace: "dstcountry=\"COUNTRY\""
# vim: syn=yaml ts=2 sw=2 expandtab
# copyright sys4 AG 2015
# This file is part of loganon.
#
# loganon is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# loganon is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with loganon. If not, see <http://www.gnu.org/licenses/>.
fortigate:
devname:
- search: "devname=[^ ]+"
- replace: "devname=DEVNAME"
devid:
- search: "devid=[^ ]+"
- replace: "devid=DEVID"
#srcip:
# - search: "srcip=[^ ]+"
# - replace: "srcip=IP"
#dstip:
# - search: "dstip=[^ ]+"
# - replace: "dstip=IP"
srcintf:
- search: "srcintf=[^ ]+"
- replace: "srcintf=SRCINTF"
dstintf:
- search: "dstintf=[^ ]+"
- replace: "dstintf=DSTINTF"
srccountry:
- search: "srccountry=[^ ]+"
- replace: "srccountry=\"COUNTRY\""
dstcountry:
- search: "dstcountry=[^ ]+"
- replace: "dstcountry=\"COUNTRY\""
# vim: syn=yaml ts=2 sw=2 expandtab
# copyright sys4 AG 2015
# This file is part of loganon.
#
# loganon is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# loganon is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with loganon. If not, see <http://www.gnu.org/licenses/>.
webproxy:
user:
- search: "user=\"([^\"]+)\""
- replace: "user=\"_MAP_\""
# vim: syn=yaml ts=2 sw=2 expandtab
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment