274 lines
9.1 KiB
Python
274 lines
9.1 KiB
Python
#!/usr/bin/python
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
# Copyright 2014 The Plaso Project Authors.
|
|
# Please see the AUTHORS file for details on individual authors.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""Parser for Linux UTMP files."""
|
|
|
|
import construct
|
|
import logging
|
|
import os
|
|
import socket
|
|
|
|
from plaso.lib import errors
|
|
from plaso.lib import event
|
|
from plaso.lib import eventdata
|
|
from plaso.lib import timelib
|
|
from plaso.parsers import interface
|
|
from plaso.parsers import manager
|
|
|
|
|
|
__author__ = 'Joaquin Moreno Garijo (Joaquin.MorenoGarijo.2013@live.rhul.ac.uk)'
|
|
|
|
|
|
class UtmpEvent(event.EventObject):
|
|
"""Convenience class for an UTMP event."""
|
|
|
|
DATA_TYPE = 'linux:utmp:event'
|
|
|
|
def __init__(
|
|
self, timestamp, microsecond, user, computer_name,
|
|
terminal, status, ip_address, structure):
|
|
"""Initializes the event object.
|
|
|
|
Args:
|
|
timestamp: Epoch when the terminal was started.
|
|
microsecond: number of microseconds related with timestamp.
|
|
user: active user name.
|
|
computer_name: name of the computer.
|
|
terminal: type of terminal.
|
|
status: login status.
|
|
ip_address: ip_address from the connection is done.
|
|
structure: entry structure parsed.
|
|
exit: integer that represents the exit status.
|
|
pid: integer with the process ID.
|
|
terminal_id: integer with the Inittab ID.
|
|
"""
|
|
super(UtmpEvent, self).__init__()
|
|
self.timestamp = timelib.Timestamp.FromPosixTimeWithMicrosecond(
|
|
timestamp, microsecond)
|
|
self.timestamp_desc = eventdata.EventTimestamp.START_TIME
|
|
self.user = user
|
|
self.computer_name = computer_name
|
|
self.terminal = terminal
|
|
self.status = status
|
|
self.ip_address = ip_address
|
|
self.exit = structure.exit
|
|
self.pid = structure.pid
|
|
self.terminal_id = structure.terminal_id
|
|
|
|
|
|
class UtmpParser(interface.BaseParser):
|
|
"""Parser for Linux/Unix UTMP files."""
|
|
|
|
NAME = 'utmp'
|
|
DESCRIPTION = u'Parser for Linux/Unix UTMP files.'
|
|
|
|
LINUX_UTMP_ENTRY = construct.Struct(
|
|
'utmp_linux',
|
|
construct.ULInt32('type'),
|
|
construct.ULInt32('pid'),
|
|
construct.String('terminal', 32),
|
|
construct.ULInt32('terminal_id'),
|
|
construct.String('username', 32),
|
|
construct.String('hostname', 256),
|
|
construct.ULInt16('termination'),
|
|
construct.ULInt16('exit'),
|
|
construct.ULInt32('session'),
|
|
construct.ULInt32('timestamp'),
|
|
construct.ULInt32('microsecond'),
|
|
construct.ULInt32('address_a'),
|
|
construct.ULInt32('address_b'),
|
|
construct.ULInt32('address_c'),
|
|
construct.ULInt32('address_d'),
|
|
construct.Padding(20))
|
|
|
|
LINUX_UTMP_ENTRY_SIZE = LINUX_UTMP_ENTRY.sizeof()
|
|
|
|
STATUS_TYPE = {
|
|
0: 'EMPTY',
|
|
1: 'RUN_LVL',
|
|
2: 'BOOT_TIME',
|
|
3: 'NEW_TIME',
|
|
4: 'OLD_TIME',
|
|
5: 'INIT_PROCESS',
|
|
6: 'LOGIN_PROCESS',
|
|
7: 'USER_PROCESS',
|
|
8: 'DEAD_PROCESS',
|
|
9: 'ACCOUNTING'}
|
|
|
|
# Set a default test value for few fields, this is supposed to be a text
|
|
# that is highly unlikely to be seen in a terminal field, or a username field.
|
|
# It is important that this value does show up in such fields, but otherwise
|
|
# it can be a free flowing text field.
|
|
_DEFAULT_TEST_VALUE = u'Ekki Fraedilegur Moguleiki, thetta er bull ! = + _<>'
|
|
|
|
def Parse(self, parser_context, file_entry, parser_chain=None):
|
|
"""Extract data from an UTMP file.
|
|
|
|
Args:
|
|
parser_context: A parser context object (instance of ParserContext).
|
|
file_entry: A file entry object (instance of dfvfs.FileEntry).
|
|
parser_chain: Optional string containing the parsing chain up to this
|
|
point. The default is None.
|
|
"""
|
|
file_object = file_entry.GetFileObject()
|
|
try:
|
|
structure = self.LINUX_UTMP_ENTRY.parse_stream(file_object)
|
|
except (IOError, construct.FieldError) as exception:
|
|
file_object.close()
|
|
raise errors.UnableToParseFile(
|
|
u'Unable to parse UTMP Header with error: {0:s}'.format(exception))
|
|
|
|
if structure.type not in self.STATUS_TYPE:
|
|
file_object.close()
|
|
raise errors.UnableToParseFile((
|
|
u'Not an UTMP file, unknown type '
|
|
u'[{0:d}].').format(structure.type))
|
|
|
|
if not self._VerifyTextField(structure.terminal):
|
|
file_object.close()
|
|
raise errors.UnableToParseFile(
|
|
u'Not an UTMP file, unknown terminal.')
|
|
|
|
if not self._VerifyTextField(structure.username):
|
|
file_object.close()
|
|
raise errors.UnableToParseFile(
|
|
u'Not an UTMP file, unknown username.')
|
|
|
|
if not self._VerifyTextField(structure.hostname):
|
|
file_object.close()
|
|
raise errors.UnableToParseFile(
|
|
u'Not an UTMP file, unknown hostname.')
|
|
|
|
# Check few values.
|
|
terminal = self._GetTextFromNullTerminatedString(
|
|
structure.terminal, self._DEFAULT_TEST_VALUE)
|
|
if terminal == self._DEFAULT_TEST_VALUE:
|
|
raise errors.UnableToParseFile(
|
|
u'Not an UTMP file, no terminal set.')
|
|
|
|
username = self._GetTextFromNullTerminatedString(
|
|
structure.username, self._DEFAULT_TEST_VALUE)
|
|
|
|
if username == self._DEFAULT_TEST_VALUE:
|
|
raise errors.UnableToParseFile(
|
|
u'Not an UTMP file, no username set.')
|
|
|
|
if not structure.timestamp:
|
|
raise errors.UnableToParseFile(
|
|
u'Not an UTMP file, no timestamp set in the first record.')
|
|
|
|
# Add ourselves to the parser chain, which will be used in all subsequent
|
|
# event creation in this parser.
|
|
parser_chain = self._BuildParserChain(parser_chain)
|
|
|
|
file_object.seek(0, os.SEEK_SET)
|
|
event_object = self._ReadUtmpEvent(file_object)
|
|
while event_object:
|
|
event_object.offset = file_object.tell()
|
|
parser_context.ProduceEvent(
|
|
event_object, file_entry=file_entry, parser_chain=None)
|
|
|
|
event_object = self._ReadUtmpEvent(file_object)
|
|
|
|
file_object.close()
|
|
|
|
def _VerifyTextField(self, text):
|
|
"""Check if a bytestream is a null terminated string.
|
|
|
|
Args:
|
|
event_object: text field from the structure.
|
|
|
|
Return:
|
|
True if it is a null terminated string, False otherwise.
|
|
"""
|
|
_, _, null_chars = text.partition('\x00')
|
|
if not null_chars:
|
|
return False
|
|
return len(null_chars) == null_chars.count('\x00')
|
|
|
|
def _ReadUtmpEvent(self, file_object):
|
|
"""Returns an UtmpEvent from a single UTMP entry.
|
|
|
|
Args:
|
|
file_object: a file-like object that points to an UTMP file.
|
|
|
|
Returns:
|
|
An event object constructed from a single UTMP record or None if we
|
|
have reached the end of the file (or EOF).
|
|
"""
|
|
offset = file_object.tell()
|
|
data = file_object.read(self.LINUX_UTMP_ENTRY_SIZE)
|
|
if not data or len(data) != self.LINUX_UTMP_ENTRY_SIZE:
|
|
return
|
|
try:
|
|
entry = self.LINUX_UTMP_ENTRY.parse(data)
|
|
except (IOError, construct.FieldError):
|
|
logging.warning((
|
|
u'UTMP entry at 0x{:x} couldn\'t be parsed.').format(offset))
|
|
return self.__ReadUtmpEvent(file_object)
|
|
|
|
user = self._GetTextFromNullTerminatedString(entry.username)
|
|
terminal = self._GetTextFromNullTerminatedString(entry.terminal)
|
|
if terminal == '~':
|
|
terminal = u'system boot'
|
|
computer_name = self._GetTextFromNullTerminatedString(entry.hostname)
|
|
if computer_name == u'N/A' or computer_name == u':0':
|
|
computer_name = u'localhost'
|
|
status = self.STATUS_TYPE.get(entry.type, u'N/A')
|
|
|
|
if not entry.address_b:
|
|
try:
|
|
ip_address = socket.inet_ntoa(
|
|
construct.ULInt32('int').build(entry.address_a))
|
|
if ip_address == '0.0.0.0':
|
|
ip_address = u'localhost'
|
|
except (IOError, construct.FieldError, socket.error):
|
|
ip_address = u'N/A'
|
|
else:
|
|
ip_address = u'{0:d}.{1:d}.{2:d}.{3:d}'.format(
|
|
entry.address_a, entry.address_b, entry.address_c, entry.address_d)
|
|
|
|
return UtmpEvent(
|
|
entry.timestamp, entry.microsecond, user, computer_name, terminal,
|
|
status, ip_address, entry)
|
|
|
|
def _GetTextFromNullTerminatedString(
|
|
self, null_terminated_string, default_string=u'N/A'):
|
|
"""Get a UTF-8 text from a raw null terminated string.
|
|
|
|
Args:
|
|
null_terminated_string: Raw string terminated with null character.
|
|
default_string: The default string returned if the parser fails.
|
|
|
|
Returns:
|
|
A decoded UTF-8 string or if unable to decode, the supplied default
|
|
string.
|
|
"""
|
|
text, _, _ = null_terminated_string.partition('\x00')
|
|
try:
|
|
text = text.decode('utf-8')
|
|
except UnicodeDecodeError:
|
|
logging.warning(
|
|
u'[UTMP] Decode UTF8 failed, the message string may be cut short.')
|
|
text = text.decode('utf-8', 'ignore')
|
|
if not text:
|
|
return default_string
|
|
return text
|
|
|
|
|
|
manager.ParsersManager.RegisterParser(UtmpParser)
|