package com.example; import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.*; import java.time.Duration; import java.util.ArrayList; import java.util.List; import static com.example.AkkaMainSystem.getDistributionCenter; public class DeliveryCar extends AbstractBehavior { private final ArrayList pakete = new ArrayList<>(); private final ArrayList> route; private final int maxCapacity = 3; private final TimerScheduler timers; private int globalIndex = 0; public interface Message {} public record LoadMessage(List pakete) implements Message {} public record PickupResponse(Paket paket) implements Message {} public record StartRoute() implements Message {} public record CheckStop() implements Message {} private DeliveryCar(ActorContext context, TimerScheduler timers, ArrayList> route) { super(context); this.route = route; this.timers = timers; } public static Behavior create(List> route) { return Behaviors.setup(context -> Behaviors.withTimers(timers -> new DeliveryCar(context, timers, new ArrayList<>(route)))); } @Override public Receive createReceive() { return newReceiveBuilder() .onMessage(PickupResponse.class, this::onPickupResponse) .onMessage(LoadMessage.class, this::onLoadMessage) .onMessage(StartRoute.class, this::onStartRoute) .onMessage(CheckStop.class, this::onCheckStop) .build(); } private Behavior onLoadMessage(LoadMessage msg) { pakete.addAll(msg.pakete); scheduleNextStopCheck(); return this; } private Behavior onPickupResponse(PickupResponse rsp) { if (rsp.paket == null || rsp.paket.inhalt == null) { scheduleNextStopCheck(); } else { if (pakete.size() < maxCapacity) { pakete.add(rsp.paket); getContext().getLog().info("Current number of packages in the truck: {}", pakete.size()); } scheduleNextStopCheck(); } return this; } private Behavior onCheckStop(CheckStop stop) { if (globalIndex >= route.size()) { getContext().getLog().info("Car is returning to the distribution center."); DistributionCenter.ArriveMessage antwort = new DistributionCenter.ArriveMessage(pakete, getContext().getSelf()); getDistributionCenter().tell(antwort); globalIndex = 0; } else { getContext().getLog().info("I am at {}", route.get(globalIndex).path().name()); if (pakete.size() >= maxCapacity) { deliverPackages(); } else { sendPickupMessage(); } globalIndex++; } scheduleNextStopCheck(); return this; } private Behavior onStartRoute(StartRoute msg) { globalIndex = 0; scheduleNextStopCheck(); return this; } private void sendPickupMessage() { if (globalIndex < route.size()) { ActorRef nextCustomer = route.get(globalIndex); nextCustomer.tell(new Customer.PickUpMessage(getContext().getSelf(), "Car can pick up package")); } else { getContext().getLog().info("No more customers in the route."); } } private void deliverPackages() { if (globalIndex < route.size()) { ActorRef nextCustomer = route.get(globalIndex); for (int i = 0; i < pakete.size(); i++) { Paket paket = pakete.get(i); if (paket.empfaenger == nextCustomer) { nextCustomer.tell(new Customer.DeliveryMessage(paket)); pakete.remove(i); i--; // Adjust index since an element is removed } } } } private void scheduleNextStopCheck() { timers.startSingleTimer(new CheckStop(), Duration.ofSeconds(3)); } }