Skip to content

Commit

Permalink
Merge pull request #9 from pbs/ipaddress_integration
Browse files Browse the repository at this point in the history
Added ipaddress support for CIDR networks
  • Loading branch information
danclaudiupop committed Apr 11, 2016
2 parents c808f8d + 242bd50 commit 6174f95
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 6 deletions.
13 changes: 9 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import sys
from glob import glob
from os.path import basename
from os.path import splitext

from setuptools import find_packages
from setuptools import setup

requires = [
'Django>=1.6,<1.9',
'psutil==4.0.0',
]

if sys.version_info < (3, 3, 0):
requires.append('py2-ipaddress>=3.4.0')

setup(
name='django-heartbeat',
Expand All @@ -24,8 +32,5 @@
py_modules=[splitext(basename(path))[0] for path in glob('src/*.py')],
include_package_data=True,
zip_safe=False,
install_requires=[
'Django>=1.6,<1.9',
'psutil==4.0.0',
]
install_requires=requires
)
27 changes: 25 additions & 2 deletions src/heartbeat/auth.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import re
import base64

import logging
from functools import wraps
from ipaddress import ip_address, ip_network

from .settings import HEARTBEAT
from django.http import HttpResponse
from django.core.exceptions import ImproperlyConfigured

logging.basicConfig(
format='%(levelname)s - %(message)s',
level=logging.INFO)
logger = logging.getLogger(__name__)


def auth(func):
@wraps(func)
Expand All @@ -16,7 +22,7 @@ def _decorator(request, *args, **kwargs):
return func(request, *args, **kwargs)
if 'authorized_ips' in auth:
ip = get_client_ip(request)
if ip in auth['authorized_ips']:
if is_authorized(ip, auth['authorized_ips']):
return func(request, *args, **kwargs)
prepare_credentials(auth)
if request.META.get('HTTP_AUTHORIZATION'):
Expand Down Expand Up @@ -80,3 +86,20 @@ def get_client_ip(request):
continue
else:
return ip


def is_authorized(ip, authorized_ips):
ip = ip_address(ip)

for item in authorized_ips:
try:
if ip == ip_address(item):
return True
except ValueError:
try:
if ip in ip_network(item):
return True
except ValueError:
logger.warn('The "authorized_ip" list (settings.HEARTBEAT)'
'contains an item that is neither an ip address '
'nor an ip network: {}'.format(item))
46 changes: 46 additions & 0 deletions tests/test_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import pytest
from django.conf import settings
if not settings.configured:
settings.configure()

from heartbeat.auth import is_authorized


def test_is_authorized_with_exact_valid_ip():
authorized_ips = ['1.3.3.7']
assert is_authorized('1.3.3.7', authorized_ips)


def test_is_authorized_with_exact_not_authorized_ip():
authorized_ips = ['1.0.0.0', '1.3.3.0', '1.3.3.6']
assert not is_authorized('1.3.3.7', authorized_ips)


def test_is_authorized_with_invalid_ip_address():
authorized_ips = ['1.3.3.7']
with pytest.raises(ValueError) as e:
is_authorized('foo', authorized_ips)
msg = '\'foo\' does not appear to be an IPv4 or IPv6 address'
assert msg in str(e)


def test_is_authorized_with_invalid_ip_network():
authorized_ips = ['1.3.3.7/foo', '1.3.3.7/33', '1.3.3.7/1337']
assert not is_authorized('1.3.3.7', authorized_ips)


@pytest.mark.parametrize('ip', ['1.3.3.1', '1.3.3.127'])
def test_is_authorized_valid_ip_with_valid_ip_network(ip):
authorized_ips = ['1.3.3.0/25']
assert is_authorized(ip, authorized_ips)


@pytest.mark.parametrize('ip', ['1.3.3.8', '1.3.3.255'])
def test_is_authorized_invalid_ip_with_valid_ip_network(ip):
authorized_ips = ['1.3.3.0/29']
assert not is_authorized(ip, authorized_ips)


def test_is_authorized_bad_authorized_ips_config_does_not_raise():
authorized_ips = ['foo', '1.3.3.7/foo', '1']
assert not is_authorized('1.3.3.7', authorized_ips)
28 changes: 28 additions & 0 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,34 @@ def test_disabled_auth(self):
json_response = json.loads(response.content.decode('utf-8'))
assert json_response['test_views']['ping'] == 'pong'

@pytest.mark.parametrize('ip,status',
[('1.3.2.7', 200), ('1.3.2.127', 200),
('1.3.2.128', 401), ('1.3.3.7', 401)])
def test_authorized_ips_with_ip_network(self, ip, status):
self.heartbeat['auth'].update({'username': 'blow', 'password': 'fish'})
self.heartbeat['auth']['authorized_ips'] = ['1.3.2.0/25']
request = self.factory.get(
reverse('1337'), **{'REMOTE_ADDR': ip})
response = details(request)
assert response.status_code == status

def test_authorized_ips_bad_ip_list_falls_back_to_basic_auth(self):
bad_authorized_ips = ['foo', '1', '1.3.3.7/256', '1.3.2.0/foo']
self.heartbeat['auth']['authorized_ips'] = bad_authorized_ips
request = self.factory.get(
reverse('1337'), **{'REMOTE_ADDR': '1.3.3.7'})
response = details(request)
assert response.status_code == 200

def test_authorized_ips_bad_ip_list_falls_back_to_bad_basic_auth(self):
bad_authorized_ips = ['foo', '1', '1.3.3.7/256', '1.3.2.0/foo']
self.heartbeat['auth'].update({'username': 'blow', 'password': 'fish'})
self.heartbeat['auth']['authorized_ips'] = bad_authorized_ips
request = self.factory.get(
reverse('1337'), **{'REMOTE_ADDR': '1.3.3.7'})
response = details(request)
assert response.status_code == 401

def test(self):
self.heartbeat['auth'].update({'username': 'blow', 'password': 'fish'})
request = self.factory.get(
Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ setenv =
changedir = {toxinidir}/tests/
deps =
pytest==2.8.7
py27: py2-ipaddress>=3.4.0
py{27,34}: mock==1.3.0
django16: Django>=1.6,<1.7
django17: Django>=1.7,<1.8
Expand Down

0 comments on commit 6174f95

Please sign in to comment.