-
Notifications
You must be signed in to change notification settings - Fork 685
/
Copy pathhistorical_flows.py
executable file
·133 lines (108 loc) · 3.86 KB
/
historical_flows.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python3
#
# Sample application for historical flows extraction
#
import os
import sys
import getopt
import time
sys.path.insert(0, '../')
from ntopng.ntopng import Ntopng
# Defaults
username = "admin"
password = "admin"
ntopng_url = "http://localhost:3000"
iface_id = 0
auth_token = None
enable_debug = False
epoch_end = int(time.time())
epoch_begin = epoch_end - 3600
maxhits = 10
host_ip = "192.168.1.1"
##########
def usage():
print("historical_flows.py [-u <username>] [-p <password>] [-t <auth token>] [-n <ntopng_url>]")
print(" [-i <interface ID>] [-H <host IP>] [--debug] [--help]")
print("")
print("Example: ./historical_flows.py -t ce0e284c774fac5a3e981152d325cfae -i 4")
print(" ./historical_flows.py -u ntop -p mypassword -i 4")
sys.exit(0)
##########
try:
opts, args = getopt.getopt(sys.argv[1:],
"hdu:p:n:i:H:t:",
["help",
"debug",
"username=",
"password=",
"ntopng_url=",
"iface_id=",
"host_ip=",
"auth_token="]
)
except getopt.GetoptError as err:
print(err)
usage()
sys.exit(2)
for o, v in opts:
if(o in ("-h", "--help")):
usage()
elif(o in ("-d", "--debug")):
enable_debug = True
elif(o in ("-u", "--username")):
username = v
elif(o in ("-p", "--password")):
password = v
elif(o in ("-n", "--ntopng_url")):
ntopng_url = v
elif(o in ("-i", "--iface_id")):
iface_id = v
elif(o in ("-H", "--host_ip")):
host_ip = v
elif(o in ("-t", "--auth_token")):
auth_token = v
##########
def format_rsp(rsp):
for row in rsp:
print(row)
def top_x_remote_ipv4_hosts(my_historical, epoch_begin, epoch_end, maxhits):
select_clause = "IPV4_DST_ADDR,SUM(TOTAL_BYTES) TOT"
where_clause = "(SERVER_LOCATION=1)"
group_by = "IPV4_DST_ADDR_FORMATTED"
order_by = "TOT DESC"
rsp = my_historical.get_flows(epoch_begin, epoch_end, select_clause, where_clause, maxhits, group_by, order_by)
format_rsp(rsp)
def top_x_remote_ipv4_hosts_ports(my_historical, epoch_begin, epoch_end, maxhits):
select_clause = "IPV4_DST_ADDR,SUM(TOTAL_BYTES) TOT,IP_DST_PORT"
where_clause = "(SERVER_LOCATION=1)"
group_by = "IPV4_DST_ADDR_FORMATTED,IP_DST_PORT"
order_by = "TOT DESC"
rsp = my_historical.get_flows(epoch_begin, epoch_end, select_clause, where_clause, maxhits, group_by, order_by)
format_rsp(rsp)
def top_x_remote_ports(my_historical, epoch_begin, epoch_end, maxhits):
select_clause = "SUM(TOTAL_BYTES) TOT,IP_DST_PORT"
where_clause = "(SERVER_LOCATION=1)"
group_by = "IP_DST_PORT"
order_by = "TOT DESC"
rsp = my_historical.get_flows(epoch_begin, epoch_end, select_clause, where_clause, maxhits, group_by, order_by)
format_rsp(rsp)
##########
try:
my_ntopng = Ntopng(username, password, auth_token, ntopng_url)
if(enable_debug):
my_ntopng.enable_debug()
except ValueError as e:
print(e)
os._exit(-1)
try:
my_historical = my_ntopng.get_historical_interface(iface_id)
print("\n==========================\nTop X Remote Hosts Traffic")
top_x_remote_ipv4_hosts(my_historical, epoch_begin, epoch_end, maxhits)
print("\n==========================\nTop X Remote Host/Ports Traffic")
top_x_remote_ipv4_hosts_ports(my_historical, epoch_begin, epoch_end, maxhits)
print("\n==========================\nTop X Remote Ports Traffic")
top_x_remote_ports(my_historical, epoch_begin, epoch_end, maxhits)
except ValueError as e:
print(e)
os._exit(-1)
os._exit(0)