# pylint: skip-file import os rank = os.environ.get('RANK', -1) print("rank:", rank) print("world_size:", os.environ.get("WORLD_SIZE", -1)) print("local_rank:", os.environ.get("LOCAL_RANK", -1)) # run in remote if rank != -1: os.system( "pip3 install https://automl-nni.oss-cn-beijing.aliyuncs.com/nni/hpo_tools/hpo_tools-0.2.1-py3-none-any.whl") # 判断 Torch 是否已经安装 try: import torch except ImportError: # 如果未安装,则使用 pip 安装 Torch os.system("pip3 install torch torchvision --trusted-host yum.tbsite.net -i http://yum.tbsite.net/pypi/simple/") import logging import pathlib import argparse import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchvision import datasets, transforms from torch.optim.lr_scheduler import StepLR import torch.distributed as dist logger = logging.getLogger('mnist_AutoML') def init_distributed_mode(args): """initilize DDP.""" if "RANK" in os.environ and "WORLD_SIZE" in os.environ: args['rank'] = int(os.environ["RANK"]) args['world_size'] = int(os.environ["WORLD_SIZE"]) elif "SLURM_PROCID" in os.environ: args['rank'] = int(os.environ["SLURM_PROCID"]) elif hasattr(args, "rank"): pass else: print("Not using distributed mode") args['distributed'] = False return args['distributed'] = True if torch.cuda.device_count() > 0: args['local_rank'] = args['rank'] % torch.cuda.device_count() if args['use_cuda']: torch.cuda.set_device(args['local_rank']) print( f"| distributed init (rank {args['rank']}): {args['dist_url']}, " f"local rank:{args['local_rank']}, world size:{args['world_size']}", flush=True) dist.init_process_group(backend=args['backend'], init_method=args['dist_url'], world_size=args['world_size'], rank=args['rank']) print("init success") class Net(nn.Module): def __init__(self): super(Net, self).__init__() self.conv1 = nn.Conv2d(1, 32, 3, 1) self.conv2 = nn.Conv2d(32, 64, 3, 1) self.dropout1 = nn.Dropout(0.25) self.dropout2 = nn.Dropout(0.5) self.fc1 = nn.Linear(9216, 128) self.fc2 = nn.Linear(128, 10) def forward(self, x): x = self.conv1(x) x = F.relu(x) x = self.conv2(x) x = F.relu(x) x = F.max_pool2d(x, 2) x = self.dropout1(x) x = torch.flatten(x, 1) x = self.fc1(x) x = F.relu(x) x = self.dropout2(x) x = self.fc2(x) output = F.log_softmax(x, dim=1) return output def train(args, model, device, train_loader, optimizer, epoch): model.train() for batch_idx, (data, target) in enumerate(train_loader): data, target = data.to(device), target.to(device) optimizer.zero_grad() output = model(data) loss = F.nll_loss(output, target) loss.backward() optimizer.step() if args['distributed']: if dist.get_rank() == 0: if batch_idx % args['log_interval'] == 0: print('Train Epoch: {}/{} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( epoch, batch_idx, dist.get_world_size() * batch_idx * len(data), len(train_loader.dataset), 100. * batch_idx / len(train_loader), loss.item())) else: if batch_idx % args['log_interval'] == 0: print('Train Epoch: {}/{} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(epoch, batch_idx, batch_idx * len(data), len(train_loader.dataset), 100. * batch_idx / len(train_loader), loss.item())) if args['dry_run']: break def test(model, device, test_loader): model.eval() test_loss = 0 correct = 0 with torch.no_grad(): for data, target in test_loader: data, target = data.to(device), target.to(device) output = model(data) test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability correct += pred.eq(target.view_as(pred)).sum().item() test_loss /= len(test_loader.dataset) accuracy = 100. * correct / len(test_loader.dataset) print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format( test_loss, correct, len(test_loader.dataset), 100. * correct / len(test_loader.dataset))) return accuracy def get_params(): # Training settings parser = argparse.ArgumentParser(description='PyTorch MNIST Example') parser.add_argument("--data_dir", type=str, default='../data/mnist/', help="data directory") parser.add_argument('--batch_size', type=int, default=256, metavar='N', help='input batch size for training (default: 64)') parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N', help='input batch size for testing (default: 1000)') parser.add_argument('--epochs', type=int, default=5, metavar='N', help='number of epochs to train (default: 14)') parser.add_argument('--lr', type=float, default=1.0, metavar='LR', help='learning rate (default: 1.0)') parser.add_argument('--gamma', type=float, default=0.7, metavar='M', help='Learning rate step gamma (default: 0.7)') parser.add_argument('--no-cuda', action='store_true', default=False, help='disables CUDA training') parser.add_argument('--dry-run', action='store_true', default=False, help='quickly check a single pass') parser.add_argument('--seed', type=int, default=1, metavar='S', help='random seed (default: 1)') parser.add_argument('--log-interval', type=int, default=10, metavar='N', help='how many batches to wait before logging training status') parser.add_argument('--save_model', type=str, default='../model', help='For Saving the current Model') parser.add_argument('--local_rank', type=int, help='local rank, will passed by ddp') parser.add_argument("--world-size", default=1, type=int, help="number of distributed processes") parser.add_argument("--dist-url", default="env://", type=str, help="url used to set up distributed training") parser.add_argument("--metric_filepath", default="../metric", type=str, help="oss dir to store the metric file") parser.add_argument("--stdout_filepath", default="../stdout", type=str, help="oss dir to store the metric file") parser.add_argument('--backend', type=str, help='Distributed backend', choices=[dist.Backend.GLOO, dist.Backend.NCCL, dist.Backend.MPI], default=dist.Backend.GLOO) args, _ = parser.parse_known_args() return args def main(args): print(args['batch_size']) print(args['no_cuda']) use_cuda = not args['no_cuda'] and torch.cuda.is_available() args['use_cuda'] = use_cuda init_distributed_mode(args) torch.manual_seed(args['seed']) device = torch.device("cuda" if use_cuda else "cpu") print("device:", device) train_kwargs = {'batch_size': args['batch_size']} test_kwargs = {'batch_size': args['test_batch_size']} if use_cuda: cuda_kwargs = { 'num_workers': 1, 'pin_memory': True, } train_kwargs.update(cuda_kwargs) test_kwargs.update(cuda_kwargs) transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]) dataset_train = datasets.MNIST(args['data_dir'], train=True, download=True, transform=transform) dataset_val = datasets.MNIST(args['data_dir'], train=False, transform=transform) if args['distributed']: train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train, shuffle=True) else: train_sampler = torch.utils.data.RandomSampler(dataset_train) test_sampler = torch.utils.data.SequentialSampler(dataset_val) train_loader = torch.utils.data.DataLoader(dataset_train, sampler=train_sampler, **train_kwargs) test_loader = torch.utils.data.DataLoader(dataset_val, sampler=test_sampler, **test_kwargs) model = Net().to(device) model_without_ddp = model if args['distributed']: if use_cuda: model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args['local_rank']], output_device=args['local_rank']) else: model = torch.nn.parallel.DistributedDataParallel(model) model_without_ddp = model.module optimizer = optim.Adadelta(model.parameters(), lr=args['lr']) scheduler = StepLR(optimizer, step_size=1, gamma=args['gamma']) for epoch in range(1, args['epochs'] + 1): if args['distributed']: train_sampler.set_epoch(epoch) train(args, model, device, train_loader, optimizer, epoch) if args['distributed']: # Only run validation on GPU 0 process, for simplity, so we do not run validation on multi gpu. if dist.get_rank() == 0: test_acc = test(model_without_ddp, device, test_loader) else: test_acc = test(model, device, test_loader) scheduler.step() if (args['distributed'] and dist.get_rank() == 0) or not args['distributed']: metric_dict = {'auc': test_acc} from hpo_tools.core.metric.remote_write_metric import write_summary_metric write_summary_metric(args['metric_filepath'], metric_dict) metric_dict_stdout = {'validation: accuracy=': test_acc} from hpo_tools.core.metric.remote_write_metric import write_stdout_metric write_stdout_metric(args['stdout_filepath'], metric_dict_stdout) print(f'validation: accuracy={test_acc}') if args['save_model']: # only save model on 0 process. if (args['distributed'] and dist.get_rank() == 0) or not args['distributed']: pathlib.Path(args['save_model']).mkdir(parents=True, exist_ok=True) path = os.path.join(args['save_model'], "mnist_cnn.pt") torch.save(model.state_dict(), path) if args['distributed']: dist.destroy_process_group() if __name__ == '__main__': args = get_params() print("args:", args) main(vars(args))